source: repository/lib/Metabrik/Client/Elasticsearch.pm

Last change on this file was 976:d09557af2531, checked in by GomoR <gomor@…>, 2 months ago
  • update: address::generate: default to allow opening of 200 000 file descriptors
  • update: client::elasticsearch: delete_by_query Command adds an optional proceed Argument to resolve conflicts and proceed instead of returning without executing the delete action
File size: 99.3 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         datadir => [ qw(datadir) ],
20         nodes => [ qw(node_list) ],
21         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
22         date => [ qw(date) ],
23         index => [ qw(index) ],
24         type => [ qw(type) ],
25         from => [ qw(number) ],
26         size => [ qw(count) ],
27         max => [ qw(count) ],
28         max_flush_count => [ qw(count) ],
29         max_flush_size => [ qw(count) ],
30         rtimeout => [ qw(seconds) ],
31         sniff_rtimeout => [ qw(seconds) ],
32         try => [ qw(count) ],
33         use_bulk_autoflush => [ qw(0|1) ],
34         use_indexing_optimizations => [ qw(0|1) ],
35         csv_header => [ qw(fields) ],
36         csv_encoded_fields => [ qw(fields) ],
37         csv_object_fields => [ qw(fields) ],
38         _es => [ qw(INTERNAL) ],
39         _bulk => [ qw(INTERNAL) ],
40         _scroll => [ qw(INTERNAL) ],
41      },
42      attributes_default => {
43         nodes => [ qw(http://localhost:9200) ],
44         cxn_pool => 'Sniff',
45         from => 0,
46         size => 10,
47         max => 0,
48         index => '*',
49         type => '*',
50         rtimeout => 60,
51         sniff_rtimeout => 3,
52         try => 3,
53         max_flush_count => 1_000,
54         max_flush_size => 1_000_000,
55         use_bulk_autoflush => 1,
56         use_indexing_optimizations => 0,
57      },
58      commands => {
59         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
60         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
61         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
62         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
63         close_scroll => [ ],
64         total_scroll => [ ],
65         next_scroll => [ qw(count|OPTIONAL) ],
66         reindex => [ qw(index_source index_destination type_destination|OPTIONAL) ],
67         get_reindex_tasks => [ ],
68         cancel_reindex_task => [ qw(id) ],
69         get_taskid => [ qw(id) ],
70         show_reindex_progress => [ ],
71         loop_show_reindex_progress => [ qw(seconds|OPTIONAL) ],
72         index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
73         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
74         index_bulk_from_list => [ qw(document_list index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
75         update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
76         update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
77         bulk_flush => [ qw(index|OPTIONAL) ],
78         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
79         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
80         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
81         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
82         delete_index => [ qw(index|indices_list) ],
83         update_alias => [ qw(new_index alias) ],
84         delete_document => [ qw(index type id) ],
85         delete_by_query => [ qw($query_hash index type proceed|OPTIONAL) ],
86         show_indices => [ qw(string_filter|OPTIONAL) ],
87         show_nodes => [ ],
88         show_health => [ ],
89         show_recovery => [ ],
90         show_allocation => [ ],
91         list_indices => [ qw(regex|OPTIONAL) ],
92         get_indices => [ ],
93         get_index => [ qw(index|indices_list) ],
94         list_index_types => [ qw(index) ],
95         list_index_fields => [ qw(index) ],
96         list_indices_version => [ qw(index|indices_list) ],
97         open_index => [ qw(index|indices_list) ],
98         close_index => [ qw(index|indices_list) ],
99         get_aliases => [ qw(index) ],
100         put_alias => [ qw(index alias) ],
101         delete_alias => [ qw(index alias) ],
102         is_mapping_exists => [ qw(index mapping) ],
103         get_mappings => [ qw(index type|OPTIONAL) ],
104         create_index => [ qw(index) ],
105         create_index_with_mappings => [ qw(index mappings) ],
106         info => [ qw(nodes_list|OPTIONAL) ],
107         version => [ qw(nodes_list|OPTIONAL) ],
108         get_templates => [ ],
109         list_templates => [ ],
110         get_template => [ qw(name) ],
111         put_template => [ qw(name template) ],
112         put_template_from_json_file => [ qw(file) ],
113         update_template_from_json_file => [ qw(file) ],
114         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
115         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
116         set_index_readonly => [ qw(index|indices_list boolean|OPTIONAL) ],
117         reset_index_readonly => [ qw(index|indices_list|OPTIONAL) ],
118         list_index_readonly => [ ],
119         set_index_number_of_replicas => [ qw(index|indices_list number) ],
120         set_index_refresh_interval => [ qw(index|indices_list number) ],
121         get_index_settings => [ qw(index|indices_list) ],
122         get_index_readonly => [ qw(index|indices_list) ],
123         get_index_number_of_replicas => [ qw(index|indices) ],
124         get_index_refresh_interval => [ qw(index|indices_list) ],
125         get_index_number_of_shards => [ qw(index|indices_list) ],
126         delete_template => [ qw(name) ],
127         is_index_exists => [ qw(index) ],
128         is_type_exists => [ qw(index type) ],
129         is_document_exists => [ qw(index type document) ],
130         parse_error_string => [ qw(string) ],
131         refresh_index => [ qw(index) ],
132         export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
133         import_from_csv => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
134         import_from_csv_worker => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
135         get_stats_process => [ ],
136         get_process => [ ],
137         get_cluster_state => [ ],
138         get_cluster_health => [ ],
139         get_cluster_settings => [ ],
140         put_cluster_settings => [ qw(settings) ],
141         count_green_indices => [ ],
142         count_yellow_indices => [ ],
143         count_red_indices => [ ],
144         list_green_indices => [ ],
145         list_yellow_indices => [ ],
146         list_red_indices => [ ],
147         count_indices => [ ],
148         list_indices_status => [ ],
149         count_shards => [ ],
150         count_size => [ ],
151         count_total_size => [ ],
152         count_count => [ ],
153         list_datatypes => [ ],
154         get_hits_total => [ ],
155         disable_shard_allocation => [ ],
156         enable_shard_allocation => [ ],
157         flush_synced => [ ],
158         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
159         create_shared_fs_snapshot_repository => [ qw(location
160            repository_name|OPTIONAL) ],
161         get_snapshot_repositories => [ ],
162         get_snapshot_status => [ ],
163         delete_snapshot_repository => [ qw(repository_name) ],
164         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
165            body|OPTIONAL) ],
166         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
167            repository_name|OPTIONAL) ],
168         is_snapshot_finished => [ ],
169         get_snapshot_state => [ ],
170         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
171         delete_snapshot => [ qw(snapshot_name repository_name) ],
172         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
173         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
174      },
175      require_modules => {
176         'Metabrik::String::Json' => [ ],
177         'Metabrik::File::Csv' => [ ],
178         'Metabrik::File::Json' => [ ],
179         'Metabrik::File::Dump' => [ ],
180         'Metabrik::Format::Number' => [ ],
181         'Metabrik::Worker::Parallel' => [ ],
182         'Search::Elasticsearch' => [ ],
183      },
184   };
185}
186
187sub brik_preinit {
188   my $self = shift;
189
190   eval("use Search::Elasticsearch;");
191   if ($Search::Elasticsearch::VERSION < 5) {
192      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
193         "with: run perl::module install Search::Elasticsearch");
194   }
195
196   return $self->SUPER::brik_preinit;
197}
198
199sub open {
200   my $self = shift;
201   my ($nodes, $cxn_pool) = @_;
202
203   $nodes ||= $self->nodes;
204   $cxn_pool ||= $self->cxn_pool;
205   $self->brik_help_run_undef_arg('open', $nodes) or return;
206   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
207   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
208   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
209
210   for my $node (@$nodes) {
211      if ($node !~ m{https?://}) {
212         return $self->log->error("open: invalid node[$node], must start with http(s)");
213      }
214   }
215
216   my $timeout = $self->rtimeout;
217
218   my $nodes_str = join('|', @$nodes);
219   $self->log->debug("open: using nodes [$nodes_str]");
220
221   #
222   # Timeout description here:
223   #
224   # Search::Elasticsearch::Role::Cxn
225   #
226
227   my $es = Search::Elasticsearch->new(
228      nodes => $nodes,
229      cxn_pool => $cxn_pool,
230      timeout => $timeout,
231      max_retries => $self->try,
232      retry_on_timeout => 1,
233      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
234      request_timeout => 60,  # seconds, default 30
235      ping_timeout => 5,  # seconds, default 2
236      dead_timeout => 120,  # seconds, detault 60
237      max_dead_timeout => 3600,  # seconds, default 3600
238      sniff_request_timeout => 15, # seconds, default 2
239      #trace_to => 'Stderr',  # For debug purposes
240   );
241   if (! defined($es)) {
242      return $self->log->error("open: failed");
243   }
244
245   $self->_es($es);
246
247   return $nodes;
248}
249
250#
251# Search::Elasticsearch::Client::5_0::Bulk
252#
253sub open_bulk_mode {
254   my $self = shift;
255   my ($index, $type) = @_;
256
257   $index ||= $self->index;
258   $type ||= $self->type;
259   my $es = $self->_es;
260   $self->brik_help_run_undef_arg('open', $es) or return;
261   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
262   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
263
264   my %args = (
265      index => $index,
266      type => $type,
267      on_error => sub {
268         #my ($action, $response, $i) = @_;
269
270         #print Data::Dumper::Dumper($action)."\n";
271         #print Data::Dumper::Dumper($response)."\n";
272         #print Data::Dumper::Dumper($i)."\n";
273         print Data::Dumper::Dumper(\@_)."\n";
274      },
275   );
276
277   if ($self->use_bulk_autoflush) {
278      my $max_count = $self->max_flush_count || 1_000;
279      my $max_size = $self->max_flush_size || 1_000_000;
280
281      $args{max_count} = $max_count;
282      $args{max_size} = $max_size;
283      $args{max_time} = 0;
284
285      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
286         "max_flush_size [$max_size]");
287   }
288   else {
289      $args{max_count} = 0;
290      $args{max_size} = 0;
291      $args{max_time} = 0;
292      $args{on_error} = undef;
293      #$args{on_success} = sub {
294         #my ($action, $response, $i) = @_;
295      #};
296
297      $self->log->info("open_bulk_mode: opening without automatic flushing");
298   }
299
300   my $bulk;
301   eval {
302      $bulk = $es->bulk_helper(%args);
303   };
304   if ($@) {
305      chomp($@);
306      return $self->log->error("open_bulk_mode: failed: [$@]");
307   }
308
309   $self->_bulk($bulk);
310
311   return $self->nodes;
312}
313
314sub open_scroll_scan_mode {
315   my $self = shift;
316   my ($index, $size) = @_;
317
318   my $version = $self->version or return;
319   if ($version ge "5.0.0") {
320      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
321         "$version, try open_scroll Command instead");
322   }
323
324   $index ||= $self->index;
325   $size ||= $self->size;
326   my $es = $self->_es;
327   $self->brik_help_run_undef_arg('open', $es) or return;
328   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
329   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
330
331   my $scroll;
332   eval {
333      $scroll = $es->scroll_helper(
334         index => $index,
335         search_type => 'scan',
336         size => $size,
337      );
338   };
339   if ($@) {
340      chomp($@);
341      return $self->log->error("open_scroll_scan_mode: failed: $@");
342   }
343
344   $self->_scroll($scroll);
345
346   return $self->nodes;
347}
348
349#
350# Search::Elasticsearch::Client::5_0::Scroll
351#
352sub open_scroll {
353   my $self = shift;
354   my ($index, $size, $type, $query) = @_;
355
356   my $version = $self->version or return;
357   if ($version lt "5.0.0") {
358      return $self->log->error("open_scroll: Command not supported for ES version ".
359         "$version, try open_scroll_scan_mode Command instead");
360   }
361
362   $query ||= { query => { match_all => {} } };
363   $index ||= $self->index;
364   $type ||= $self->type;
365   $size ||= $self->size;
366   my $es = $self->_es;
367   $self->brik_help_run_undef_arg('open', $es) or return;
368   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
369   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
370
371   my $timeout = $self->rtimeout;
372
373   my %args = (
374      scroll => "${timeout}s",
375      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
376                          # it in query string.
377      index => $index,
378      size => $size,
379      body => $query,
380   );
381   if ($type ne '*') {
382      $args{type} = $type;
383   }
384
385   #
386   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
387   #
388   my $scroll;
389   eval {
390      $scroll = $es->scroll_helper(%args);
391   };
392   if ($@) {
393      chomp($@);
394      return $self->log->error("open_scroll: failed: $@");
395   }
396
397   $self->_scroll($scroll);
398
399   $self->log->verbose("open_scroll: opened with size [$size] and timeout [${timeout}s]");
400
401   return $self->nodes;
402}
403
404#
405# Search::Elasticsearch::Client::5_0::Scroll
406#
407sub close_scroll {
408   my $self = shift;
409
410   my $scroll = $self->_scroll;
411   if (! defined($scroll)) {
412      return 1;
413   }
414
415   $scroll->finish;
416   $self->_scroll(undef);
417
418   return 1;
419}
420
421sub total_scroll {
422   my $self = shift;
423
424   my $scroll = $self->_scroll;
425   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
426
427   my $total;
428   eval {
429      $total = $scroll->total;
430   };
431   if ($@) {
432      chomp($@);
433      return $self->log->error("total_scroll: failed with: [$@]");
434   }
435
436   return $total;
437}
438
439sub next_scroll {
440   my $self = shift;
441   my ($count) = @_;
442
443   $count ||= 1;
444
445   my $scroll = $self->_scroll;
446   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
447
448   my $next;
449   eval {
450      if ($count > 1) {
451         my @docs = $scroll->next($count);
452         if (@docs > 0) {
453            $next = \@docs;
454         }
455      }
456      else {
457         $next = $scroll->next;
458      }
459   };
460   if ($@) {
461      chomp($@);
462      return $self->log->error("next_scroll: failed with: [$@]");
463   }
464
465   return $next;
466}
467
468#
469# Search::Elasticsearch::Client::5_0::Direct
470#
471sub index_document {
472   my $self = shift;
473   my ($doc, $index, $type, $hash, $id) = @_;
474
475   $index ||= $self->index;
476   $type ||= $self->type;
477   my $es = $self->_es;
478   $self->brik_help_run_undef_arg('open', $es) or return;
479   $self->brik_help_run_undef_arg('index_document', $doc) or return;
480   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
481   $self->brik_help_set_undef_arg('index', $index) or return;
482   $self->brik_help_set_undef_arg('type', $type) or return;
483
484   my %args = (
485      index => $index,
486      type => $type,
487      body => $doc,
488   );
489   if (defined($id)) {
490      $args{id} = $id;
491   }
492
493   if (defined($hash)) {
494      $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH') or return;
495      %args = ( %args, %$hash );
496   }
497
498   my $r;
499   eval {
500      $r = $es->index(%args);
501   };
502   if ($@) {
503      chomp($@);
504      return $self->log->error("index_document: index failed for index [$index]: [$@]");
505   }
506
507   return $r;
508}
509
510#
511# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
512#
513sub reindex {
514   my $self = shift;
515   my ($index, $new, $type) = @_;
516
517   my $es = $self->_es;
518   $self->brik_help_run_undef_arg('open', $es) or return;
519   $self->brik_help_run_undef_arg('reindex', $index) or return;
520   $self->brik_help_run_undef_arg('reindex', $new) or return;
521
522   my %args = (
523      body => {
524         conflicts => 'proceed',
525         source => { index => $index },
526         dest => { index => $new },
527      },
528      wait_for_completion => 'false',  # Immediately return the task.
529   );
530
531   # Change the type for destination doc
532   if (defined($type)) {
533      $args{body}{dest}{type} = $type;
534   }
535
536   my $r;
537   eval {
538      $r = $es->reindex(%args);
539   };
540   if ($@) {
541      chomp($@);
542      return $self->log->error("reindex: reindex failed for index [$index]: [$@]");
543   }
544
545   return $r;
546}
547
548#
549# List reindex tasks
550#
551# curl -X GET "localhost:9200/_tasks?detailed=true&actions=*reindex" | jq .
552#
553# Cancel reindex task
554#
555# curl -X POST "localhost:9200/_tasks/7VelPnOxQm21HtuJNFUAvQ:120914725/_cancel" | jq .
556#
557
558#
559# Search::Elasticsearch::Client::6_0::Direct::Tasks
560#
561sub get_reindex_tasks {
562   my $self = shift;
563
564   my $es = $self->_es;
565   $self->brik_help_run_undef_arg('open', $es) or return;
566
567   my $t = $es->tasks;
568
569   my $list = $t->list;
570   my $nodes = $list->{nodes};
571   if (! defined($nodes)) {
572      return $self->log->error("get_reindex_tasks: no nodes found");
573   }
574
575   my %tasks = ();
576   for my $node (keys %$nodes) {
577      for my $id (keys %{$nodes->{$node}}) {
578         my $tasks = $nodes->{$node}{tasks};
579         for my $task (keys %$tasks) {
580            my $action = $tasks->{$task}{action};
581            if ($action eq 'indices:data/write/reindex' && !exists($tasks{$task})) {
582               $tasks{$task} = $tasks->{$task};
583            }
584         }
585      }
586   }
587
588   return \%tasks;
589}
590
591sub cancel_reindex_task {
592   my $self = shift;
593   my ($id) = @_;
594
595   my $es = $self->_es;
596   $self->brik_help_run_undef_arg('open', $es) or return;
597   $self->brik_help_run_undef_arg('cancel_reindex_task', $id) or return;
598
599   my $t = $es->tasks;
600
601   return $t->cancel(task_id => $id);
602}
603
604sub get_taskid {
605   my $self = shift;
606   my ($id) = @_;
607
608   my $es = $self->_es;
609   $self->brik_help_run_undef_arg('open', $es) or return;
610   $self->brik_help_run_undef_arg('get_taskid', $id) or return;
611
612   my $t = $es->tasks;
613
614   return $t->get(task_id => $id);
615}
616
617sub show_reindex_progress {
618   my $self = shift;
619
620   my $es = $self->_es;
621   $self->brik_help_run_undef_arg('open', $es) or return;
622
623   my $tasks = $self->get_reindex_tasks or return;
624   if (! keys %$tasks) {
625      $self->log->info("show_reindex_progress: no reindex task in progress");
626      return 0;
627   }
628
629   for my $id (keys %$tasks) {
630      my $task = $self->get_taskid($id) or next;
631
632      my $status = $task->{task}{status};
633      my $desc = $task->{task}{description};
634      my $total = $status->{total};
635      my $created = $status->{created};
636      my $deleted = $status->{deleted};
637      my $updated = $status->{updated};
638
639      my $perc = ($created + $deleted + $updated) / $total * 100;
640
641      printf("> Task [%s]: %.02f%%\n", $desc, $perc);
642      print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
643   }
644
645   return 1;
646}
647
648sub loop_show_reindex_progress {
649   my $self = shift;
650   my ($sec) = @_;
651
652   $sec ||= 60;
653   my $es = $self->_es;
654   $self->brik_help_run_undef_arg('open', $es) or return;
655
656   while (1) {
657      $self->show_reindex_progress or return;
658      sleep($sec);
659   }
660
661   return 1;
662}
663
664sub reindex_with_mapping_from_json_file {
665   my $self = shift;
666   my ($index, $new, $file) = @_;
667
668   my $es = $self->_es;
669   $self->brik_help_run_undef_arg('open', $es) or return;
670   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
671      or return;
672   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
673   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
674   $self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
675      or return;
676
677   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
678   my $json = $fj->read($file) or return;
679
680   return $self->reindex($index, $new, $json);
681}
682
683#
684# Search::Elasticsearch::Client::5_0::Direct
685#
686sub update_document {
687   my $self = shift;
688   my ($doc, $id, $index, $type, $hash) = @_;
689
690   $index ||= $self->index;
691   $type ||= $self->type;
692   my $es = $self->_es;
693   $self->brik_help_run_undef_arg('open', $es) or return;
694   $self->brik_help_run_undef_arg('update_document', $doc) or return;
695   $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
696   $self->brik_help_run_undef_arg('update_document', $id) or return;
697   $self->brik_help_set_undef_arg('index', $index) or return;
698   $self->brik_help_set_undef_arg('type', $type) or return;
699
700   my %args = (
701      id => $id,
702      index => $index,
703      type => $type,
704      body => { doc => $doc },
705   );
706
707   if (defined($hash)) {
708      $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH') or return;
709      %args = ( %args, %$hash );
710   }
711
712   my $r;
713   eval {
714      $r = $es->update(%args);
715   };
716   if ($@) {
717      chomp($@);
718      return $self->log->error("update_document: index failed for index [$index]: [$@]");
719   }
720
721   return $r;
722}
723
724#
725# Search::Elasticsearch::Client::5_0::Bulk
726#
727sub index_bulk {
728   my $self = shift;
729   my ($doc, $index, $type, $hash, $id) = @_;
730
731   my $bulk = $self->_bulk;
732   $index ||= $self->index;
733   $type ||= $self->type;
734   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
735   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
736   $self->brik_help_set_undef_arg('index', $index) or return;
737   $self->brik_help_set_undef_arg('type', $type) or return;
738
739   my %args = (
740      source => $doc,
741   );
742   if (defined($id)) {
743      $args{id} = $id;
744   }
745
746   if (defined($hash)) {
747      $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
748      %args = ( %args, %$hash );
749   }
750
751   my $r;
752   eval {
753      $r = $bulk->add_action(index => \%args);
754   };
755   if ($@) {
756      chomp($@);
757      my $p = $self->parse_error_string($@);
758      if (defined($p) && exists($p->{class})) {
759         my $class = $p->{class};
760         my $code = $p->{code};
761         my $node = $p->{node};
762         return $self->log->error("index_bulk: failed for index [$index] with error ".
763            "[$class] code [$code] for node [$node]");
764      }
765      else {
766         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
767      }
768   }
769
770   return $r;
771}
772
773#
774# Allows to index multiple docs at one time
775# $bulk->index({ source => $doc1 }, { source => $doc2 }, ...);
776#
777sub index_bulk_from_list {
778   my $self = shift;
779   my ($list, $index, $type, $hash) = @_;
780
781   my $bulk = $self->_bulk;
782   $index ||= $self->index;
783   $type ||= $self->type;
784   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
785   $self->brik_help_run_undef_arg('index_bulk_from_list', $list) or return;
786   $self->brik_help_run_invalid_arg('index_bulk_from_list', $list, 'ARRAY') or return;
787   $self->brik_help_run_empty_array_arg('index_bulk_from_list', $list) or return;
788   $self->brik_help_set_undef_arg('index', $index) or return;
789   $self->brik_help_set_undef_arg('type', $type) or return;
790
791   if (defined($hash)) {
792      $self->brik_help_run_invalid_arg('index_bulk_from_list', $hash, 'HASH') or return;
793   }
794
795   my @args = ();
796   for my $doc (@$list) {
797      my %args = (
798         source => $doc,
799      );
800      if (defined($hash)) {
801         %args = ( %args, %$hash );
802      }
803      push @args, \%args;
804   }
805
806   my $r;
807   eval {
808      $r = $bulk->index(@args);
809   };
810   if ($@) {
811      chomp($@);
812      my $p = $self->parse_error_string($@);
813      if (defined($p) && exists($p->{class})) {
814         my $class = $p->{class};
815         my $code = $p->{code};
816         my $node = $p->{node};
817         return $self->log->error("index_bulk: failed for index [$index] with error ".
818            "[$class] code [$code] for node [$node]");
819      }
820      else {
821         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
822      }
823   }
824
825   return $r;
826}
827
828sub update_document_bulk {
829   my $self = shift;
830   my ($doc, $index, $type, $hash, $id) = @_;
831
832   my $bulk = $self->_bulk;
833   $index ||= $self->index;
834   $type ||= $self->type;
835   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
836   $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
837   $self->brik_help_set_undef_arg('index', $index) or return;
838   $self->brik_help_set_undef_arg('type', $type) or return;
839
840   my %args = (
841      index => $index,
842      type => $type,
843      doc => $doc,
844   );
845   if (defined($id)) {
846      $args{id} = $id;
847   }
848
849   if (defined($hash)) {
850      $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH') or return;
851      %args = ( %args, %$hash );
852   }
853
854   my $r;
855   eval {
856      $r = $bulk->update(\%args);
857   };
858   if ($@) {
859      chomp($@);
860      my $p = $self->parse_error_string($@);
861      if (defined($p) && exists($p->{class})) {
862         my $class = $p->{class};
863         my $code = $p->{code};
864         my $node = $p->{node};
865         return $self->log->error("update_document_bulk: failed for index [$index] ".
866            "with error [$class] code [$code] for node [$node]");
867      }
868      else {
869         return $self->log->error("update_document_bulk: index failed for ".
870            "index [$index]: [$@]");
871      }
872   }
873
874   return $r;
875}
876
877#
878# We may have to call refresh_index after a bulk_flush, so we give an additional
879# optional Argument for given index.
880#
881sub bulk_flush {
882   my $self = shift;
883   my ($index) = @_;
884
885   my $bulk = $self->_bulk;
886   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
887
888   my $try = $self->try;
889
890RETRY:
891
892   my $r;
893   eval {
894      $r = $bulk->flush;
895   };
896   if ($@) {
897      chomp($@);
898      if (--$try == 0) {
899         my $p = $self->parse_error_string($@);
900         if (defined($p) && exists($p->{class})) {
901            my $class = $p->{class};
902            my $code = $p->{code};
903            my $node = $p->{node};
904            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
905               "[$class] code [$code] for node [$node]");
906         }
907         else {
908            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
909         }
910      }
911      $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
912               "[$@]");
913      sleep 10;
914      goto RETRY;
915   }
916
917   if (defined($index)) {
918      $self->refresh_index($index);
919   }
920
921   return $r;
922}
923
924#
925# Search::Elasticsearch::Client::2_0::Direct
926# Search::Elasticsearch::Client::5_0::Direct
927#
928sub count {
929   my $self = shift;
930   my ($index, $type) = @_;
931
932   $index ||= $self->index;
933   $type ||= $self->type;
934   my $es = $self->_es;
935   $self->brik_help_run_undef_arg('open', $es) or return;
936
937   my %args = ();
938   if (defined($index) && $index ne '*') {
939      $args{index} = $index;
940   }
941   if (defined($type) && $type ne '*') {
942      $args{type} = $type;
943   }
944
945   #$args{body} = {
946      #query => {
947         #match => { title => 'Elasticsearch clients' },
948      #},
949   #}
950
951   my $r;
952   my $version = $self->version or return;
953   if ($version ge "5.0.0") {
954      eval {
955         $r = $es->count(%args);
956      };
957   }
958   else {
959      eval {
960         $r = $es->search(
961            index => $index,
962            type => $type,
963            search_type => 'count',
964            body => {
965               query => {
966                  match_all => {},
967               },
968            },
969         );
970      };
971   }
972   if ($@) {
973      chomp($@);
974      return $self->log->error("count: count failed for index [$index]: [$@]");
975   }
976
977   if ($version ge "5.0.0") {
978      if (exists($r->{count})) {
979         return $r->{count};
980      }
981   }
982   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
983      return $r->{hits}{total};
984   }
985
986   return $self->log->error("count: nothing found");
987}
988
989#
990# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
991# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
992#
993# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
994#
995sub query {
996   my $self = shift;
997   my ($query, $index, $type, $hash) = @_;
998
999   $index ||= $self->index;
1000   $type ||= $self->type;
1001   my $es = $self->_es;
1002   $self->brik_help_run_undef_arg('open', $es) or return;
1003   $self->brik_help_run_undef_arg('query', $query) or return;
1004   $self->brik_help_set_undef_arg('index', $index) or return;
1005   $self->brik_help_set_undef_arg('type', $type) or return;
1006   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
1007
1008   my $timeout = $self->rtimeout;
1009
1010   my %args = (
1011      index => $index,
1012      body => $query,
1013   );
1014
1015   if (defined($hash)) {
1016      $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
1017      %args = ( %args, %$hash );
1018   }
1019
1020   if ($type ne '*') {
1021      $args{type} = $type;
1022   }
1023
1024   my $r;
1025   eval {
1026      $r = $es->search(%args);
1027   };
1028   if ($@) {
1029      chomp($@);
1030      return $self->log->error("query: failed for index [$index]: [$@]");
1031   }
1032
1033   return $r;
1034}
1035
1036sub get_from_id {
1037   my $self = shift;
1038   my ($id, $index, $type) = @_;
1039
1040   $index ||= $self->index;
1041   $type ||= $self->type;
1042   my $es = $self->_es;
1043   $self->brik_help_run_undef_arg('open', $es) or return;
1044   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
1045   $self->brik_help_set_undef_arg('index', $index) or return;
1046   $self->brik_help_set_undef_arg('type', $type) or return;
1047
1048   my $r;
1049   eval {
1050      $r = $es->get(
1051         index => $index,
1052         type => $type,
1053         id => $id,
1054      );
1055   };
1056   if ($@) {
1057      chomp($@);
1058      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
1059   }
1060
1061   return $r;
1062}
1063
1064#
1065# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
1066#
1067sub www_search {
1068   my $self = shift;
1069   my ($query, $index, $type) = @_;
1070
1071   $index ||= $self->index;
1072   $type ||= $self->type;
1073   $self->brik_help_run_undef_arg('www_search', $query) or return;
1074   $self->brik_help_set_undef_arg('index', $index) or return;
1075   $self->brik_help_set_undef_arg('type', $type) or return;
1076
1077   my $from = $self->from;
1078   my $size = $self->size;
1079
1080   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
1081
1082   my $nodes = $self->nodes;
1083   for my $node (@$nodes) {
1084      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
1085      my $url = "$node/$index";
1086      if ($type ne '*') {
1087         $url .= "/$type";
1088      }
1089      $url .= "/_search/?from=$from&size=$size&q=".$query;
1090
1091      my $get = $self->SUPER::get($url) or next;
1092      my $body = $get->{content};
1093
1094      my $decoded = $sj->decode($body) or next;
1095
1096      return $decoded;
1097   }
1098
1099   return;
1100}
1101
1102#
1103# Search::Elasticsearch::Client::2_0::Direct::Indices
1104#
1105sub delete_index {
1106   my $self = shift;
1107   my ($index) = @_;
1108
1109   my $es = $self->_es;
1110   $self->brik_help_run_undef_arg('open', $es) or return;
1111   $self->brik_help_run_undef_arg('delete_index', $index) or return;
1112   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
1113
1114   my %args = (
1115      index => $index,
1116   );
1117
1118   my $r;
1119   eval {
1120      $r = $es->indices->delete(%args);
1121   };
1122   if ($@) {
1123      chomp($@);
1124      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
1125   }
1126
1127   return $r;
1128}
1129
1130#
1131# Search::Elasticsearch::Client::2_0::Direct::Indices
1132#
1133sub delete_document {
1134   my $self = shift;
1135   my ($index, $type, $id, $hash) = @_;
1136
1137   my $es = $self->_es;
1138   $self->brik_help_run_undef_arg('open', $es) or return;
1139   $self->brik_help_run_undef_arg('delete_document', $index) or return;
1140   $self->brik_help_run_undef_arg('delete_document', $type) or return;
1141   $self->brik_help_run_undef_arg('delete_document', $id) or return;
1142
1143   my %args = (
1144      index => $index,
1145      type => $type,
1146      id => $id,
1147   );
1148
1149   if (defined($hash)) {
1150      $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH') or return;
1151      %args = ( %args, %$hash );
1152   }
1153
1154   my $r;
1155   eval {
1156      $r = $es->delete(%args);
1157   };
1158   if ($@) {
1159      chomp($@);
1160      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
1161   }
1162
1163   return $r;
1164}
1165
1166#
1167# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
1168#
1169# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1170#
1171sub delete_by_query {
1172   my $self = shift;
1173   my ($query, $index, $type, $proceed) = @_;
1174
1175   my $es = $self->_es;
1176   $self->brik_help_run_undef_arg('open', $es) or return;
1177   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
1178   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
1179   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
1180   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
1181
1182   my $timeout = $self->rtimeout;
1183
1184   my %args = (
1185      index => $index,
1186      type => $type,
1187      body => $query,
1188   );
1189
1190   if (defined($proceed) && $proceed) {
1191      $args{conflicts} = 'proceed';
1192   }
1193
1194   my $r;
1195   eval {
1196      $r = $es->delete_by_query(%args);
1197   };
1198   if ($@) {
1199      chomp($@);
1200      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
1201   }
1202
1203   # This may fail, we ignore it.
1204   $self->refresh_index($index);
1205
1206   return $r;
1207}
1208
1209#
1210# Search::Elasticsearch::Client::2_0::Direct::Cat
1211#
1212# https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html
1213#
1214sub show_indices {
1215   my $self = shift;
1216   my ($string) = @_;
1217
1218   my $es = $self->_es;
1219   $self->brik_help_run_undef_arg('open', $es) or return;
1220
1221   my $r;
1222   eval {
1223      $r = $es->cat->indices;
1224   };
1225   if ($@) {
1226      chomp($@);
1227      return $self->log->error("show_indices: failed: [$@]");
1228   }
1229
1230   my @lines = split(/\n/, $r);
1231
1232   if (@lines == 0) {
1233      $self->log->warning("show_indices: nothing returned, no index?");
1234   }
1235
1236   my @filtered = ();
1237   if (defined($string)) {
1238      for (@lines) {
1239         if (m{$string}) {
1240            push @filtered, $_;
1241         }
1242      }
1243      @lines = @filtered;
1244   }
1245
1246   return \@lines;
1247}
1248
1249#
1250# Search::Elasticsearch::Client::2_0::Direct::Cat
1251#
1252sub show_nodes {
1253   my $self = shift;
1254
1255   my $es = $self->_es;
1256   $self->brik_help_run_undef_arg('open', $es) or return;
1257
1258   my $r;
1259   eval {
1260      $r = $es->cat->nodes;
1261   };
1262   if ($@) {
1263      chomp($@);
1264      return $self->log->error("show_nodes: failed: [$@]");
1265   }
1266
1267   my @lines = split(/\n/, $r);
1268
1269   if (@lines == 0) {
1270      $self->log->warning("show_nodes: nothing returned, no nodes?");
1271   }
1272
1273   return \@lines;
1274}
1275
1276#
1277# Search::Elasticsearch::Client::2_0::Direct::Cat
1278#
1279sub show_health {
1280   my $self = shift;
1281
1282   my $es = $self->_es;
1283   $self->brik_help_run_undef_arg('open', $es) or return;
1284
1285   my $r;
1286   eval {
1287      $r = $es->cat->health;
1288   };
1289   if ($@) {
1290      chomp($@);
1291      return $self->log->error("show_health: failed: [$@]");
1292   }
1293
1294   my @lines = split(/\n/, $r);
1295
1296   if (@lines == 0) {
1297      $self->log->warning("show_health: nothing returned, no recovery?");
1298   }
1299
1300   return \@lines;
1301}
1302
1303#
1304# Search::Elasticsearch::Client::2_0::Direct::Cat
1305#
1306sub show_recovery {
1307   my $self = shift;
1308
1309   my $es = $self->_es;
1310   $self->brik_help_run_undef_arg('open', $es) or return;
1311
1312   my $r;
1313   eval {
1314      $r = $es->cat->recovery;
1315   };
1316   if ($@) {
1317      chomp($@);
1318      return $self->log->error("show_recovery: failed: [$@]");
1319   }
1320
1321   my @lines = split(/\n/, $r);
1322
1323   if (@lines == 0) {
1324      $self->log->warning("show_recovery: nothing returned, no index?");
1325   }
1326
1327   return \@lines;
1328}
1329
1330#
1331# curl -s 'localhost:9200/_cat/allocation?v'
1332#
1333sub show_allocation {
1334   my $self = shift;
1335
1336   my $es = $self->_es;
1337   $self->brik_help_run_undef_arg('open', $es) or return;
1338
1339   my $r;
1340   eval {
1341      $r = $es->cat->allocation;
1342   };
1343   if ($@) {
1344      chomp($@);
1345      return $self->log->error("show_allocation: failed: [$@]");
1346   }
1347
1348   my @lines = split(/\n/, $r);
1349
1350   if (@lines == 0) {
1351      $self->log->warning("show_allocation: nothing returned, no index?");
1352   }
1353
1354   return \@lines;
1355}
1356
1357sub list_indices {
1358   my $self = shift;
1359   my ($regex) = @_;
1360
1361   my $get = $self->get_indices or return;
1362
1363   my @indices = ();
1364   for (@$get) {
1365      if (defined($regex)) {
1366         if ($_->{index} =~ m{$regex}) {
1367            push @indices, $_->{index};
1368         }
1369      }
1370      else {
1371         push @indices, $_->{index};
1372      }
1373   }
1374
1375   return [ sort { $a cmp $b } @indices ];
1376}
1377
1378sub get_indices {
1379   my $self = shift;
1380
1381   my $lines = $self->show_indices or return;
1382   if (@$lines == 0) {
1383      $self->log->warning("get_indices: no index found");
1384      return [];
1385   }
1386
1387   #
1388   # Format depends on ElasticSearch version. We try to detect the format.
1389   #
1390   # 5.0.0:
1391   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
1392   #
1393   my @indices = ();
1394   for (@$lines) {
1395      my @t = split(/\s+/);
1396      if (@t == 10) {  # Version 5.0.0
1397         my $color = $t[0];
1398         my $state = $t[1];
1399         my $index = $t[2];
1400         my $id = $t[3];
1401         my $shards = $t[4];
1402         my $replicas = $t[5];
1403         my $count = $t[6];
1404         my $count2 = $t[7];
1405         my $total_size = $t[8];
1406         my $size = $t[9];
1407         push @indices, {
1408            color => $color,
1409            state => $state,
1410            index => $index,
1411            id => $id,
1412            shards => $shards,
1413            replicas => $replicas,
1414            count => $count,
1415            total_size => $total_size,
1416            size => $size,
1417         };
1418      }
1419      elsif (@t == 9) {
1420         my $index = $t[2];
1421         push @indices, {
1422            index => $index,
1423         };
1424      }
1425      elsif (@t == 8) {
1426         my $index = $t[1];
1427         push @indices, {
1428            index => $index,
1429         };
1430      }
1431   }
1432
1433   return \@indices;
1434}
1435
1436#
1437# Search::Elasticsearch::Client::5_0::Direct::Indices
1438#
1439sub get_index {
1440   my $self = shift;
1441   my ($index) = @_;
1442 
1443   my $es = $self->_es;
1444   $self->brik_help_run_undef_arg('open', $es) or return;
1445   $self->brik_help_run_undef_arg('get_index', $index) or return;
1446   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1447
1448   my %args = (
1449      index => $index,
1450   );
1451
1452   my $r;
1453   eval {
1454      $r = $es->indices->get(%args);
1455   };
1456   if ($@) {
1457      chomp($@);
1458      return $self->log->error("get_index: get failed for index [$index]: [$@]");
1459   }
1460
1461   return $r;
1462}
1463
1464sub list_index_types {
1465   my $self = shift;
1466   my ($index) = @_;
1467
1468   my $es = $self->_es;
1469   $self->brik_help_run_undef_arg('open', $es) or return;
1470   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1471   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1472
1473   my $r = $self->get_mappings($index) or return;
1474   if (keys %$r > 1) {
1475      return $self->log->error("list_index_types: multiple indices found, choose one");
1476   }
1477
1478   my @types = ();
1479   for my $this_index (keys %$r) {
1480      my $mappings = $r->{$this_index}{mappings};
1481      push @types, keys %$mappings;
1482   }
1483
1484   my %uniq = map { $_ => 1 } @types;
1485
1486   return [ sort { $a cmp $b } keys %uniq ];
1487}
1488
1489#
1490# By default, if you provide only one index and no type,
1491# all types will be merged (including _default_)
1492# If you specify one type (other than _default_), _default_ will be merged to it.
1493#
1494sub list_index_fields {
1495   my $self = shift;
1496   my ($index, $type) = @_;
1497
1498   my $es = $self->_es;
1499   $self->brik_help_run_undef_arg('open', $es) or return;
1500   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1501   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1502
1503   my $r;
1504   if (defined($type)) {
1505      $r = $self->get_mappings($index, $type) or return;
1506      if (keys %$r > 1) {
1507         return $self->log->error("list_index_fields: multiple indices found, ".
1508            "choose one");
1509      }
1510      # _default_ mapping may not exists.
1511      if ($self->is_mapping_exists($index, '_default_')) {
1512         my $r2 = $self->get_mappings($index, '_default_');
1513         # Merge
1514         for my $this_index (keys %$r2) {
1515            my $default = $r2->{$this_index}{mappings}{'_default_'};
1516            $r->{$this_index}{mappings}{_default_} = $default;
1517         }
1518      }
1519   }
1520   else {
1521      $r = $self->get_mappings($index) or return;
1522      if (keys %$r > 1) {
1523         return $self->log->error("list_index_fields: multiple indices found, ".
1524            "choose one");
1525      }
1526   }
1527
1528   my @fields = ();
1529   for my $this_index (keys %$r) {
1530      my $mappings = $r->{$this_index}{mappings};
1531      for my $this_type (keys %$mappings) {
1532         my $properties = $mappings->{$this_type}{properties};
1533         push @fields, keys %$properties;
1534      }
1535   }
1536
1537   my %uniq = map { $_ => 1 } @fields;
1538
1539   return [ sort { $a cmp $b } keys %uniq ];
1540}
1541
1542sub list_indices_version {
1543   my $self = shift;
1544   my ($index) = @_;
1545
1546   my $es = $self->_es;
1547   $self->brik_help_run_undef_arg('open', $es) or return;
1548   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1549   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1550      or return;
1551
1552   my $r = $self->get_index($index) or return;
1553
1554   my @list = ();
1555   for my $this (keys %$r) {
1556      my $name = $this;
1557      my $version = $r->{$this}{settings}{index}{version}{created};
1558      push @list, {
1559         index => $name,
1560         version => $version,
1561      };
1562   }
1563
1564   return \@list;
1565}
1566
1567sub open_index {
1568   my $self = shift;
1569   my ($index) = @_;
1570
1571   my $es = $self->_es;
1572   $self->brik_help_run_undef_arg('open', $es) or return;
1573   $self->brik_help_run_undef_arg('open_index', $index) or return;
1574   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1575
1576   my $r;
1577   eval {
1578      $r = $es->indices->open(
1579         index => $index,
1580      );
1581   };
1582   if ($@) {
1583      chomp($@);
1584      return $self->log->error("open_index: failed: [$@]");
1585   }
1586
1587   return $r;
1588}
1589
1590sub close_index {
1591   my $self = shift;
1592   my ($index) = @_;
1593
1594   my $es = $self->_es;
1595   $self->brik_help_run_undef_arg('open', $es) or return;
1596   $self->brik_help_run_undef_arg('close_index', $index) or return;
1597   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1598
1599   my $r;
1600   eval {
1601      $r = $es->indices->close(
1602         index => $index,
1603      );
1604   };
1605   if ($@) {
1606      chomp($@);
1607      return $self->log->error("close_index: failed: [$@]");
1608   }
1609
1610   return $r;
1611}
1612
1613#
1614# Search::Elasticsearch::Client::5_0::Direct::Indices
1615#
1616sub get_aliases {
1617   my $self = shift;
1618   my ($index) = @_;
1619
1620   $index ||= $self->index;
1621   my $es = $self->_es;
1622   $self->brik_help_run_undef_arg('open', $es) or return;
1623
1624   my %args = (
1625      index => $index,
1626   );
1627
1628   my $r;
1629   eval {
1630      $r = $es->indices->get(%args);
1631   };
1632   if ($@) {
1633      chomp($@);
1634      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1635   }
1636
1637   my %aliases = ();
1638   for my $this (keys %$r) {
1639      $aliases{$this} = $r->{$this}{aliases};
1640   }
1641
1642   return \%aliases;
1643}
1644
1645#
1646# Search::Elasticsearch::Client::5_0::Direct::Indices
1647#
1648sub put_alias {
1649   my $self = shift;
1650   my ($index, $alias) = @_;
1651
1652   my $es = $self->_es;
1653   $self->brik_help_run_undef_arg('open', $es) or return;
1654   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1655   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1656
1657   my %args = (
1658      index => $index,
1659      name => $alias,
1660   );
1661
1662   my $r;
1663   eval {
1664      $r = $es->indices->put_alias(%args);
1665   };
1666   if ($@) {
1667      chomp($@);
1668      return $self->log->error("put_alias: put_alias failed: [$@]");
1669   }
1670
1671   return $r;
1672}
1673
1674#
1675# Search::Elasticsearch::Client::5_0::Direct::Indices
1676#
1677sub delete_alias {
1678   my $self = shift;
1679   my ($index, $alias) = @_;
1680
1681   my $es = $self->_es;
1682   $self->brik_help_run_undef_arg('open', $es) or return;
1683   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1684   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1685
1686   my %args = (
1687      index => $index,
1688      name => $alias,
1689   );
1690
1691   my $r;
1692   eval {
1693      $r = $es->indices->delete_alias(%args);
1694   };
1695   if ($@) {
1696      chomp($@);
1697      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1698   }
1699
1700   return $r;
1701}
1702
1703sub update_alias {
1704   my $self = shift;
1705   my ($new_index, $alias) = @_;
1706
1707   my $es = $self->_es;
1708   $self->brik_help_run_undef_arg('open', $es) or return;
1709   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1710   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1711
1712   # Search for previous index with that alias, if any.
1713   my $prev_index;
1714   my $aliases = $self->get_aliases or return;
1715   while (my ($k, $v) = each %$aliases) {
1716      for my $this (keys %$v) {
1717         if ($this eq $alias) {
1718            $prev_index = $k;
1719            last;
1720         }
1721      }
1722      last if $prev_index;
1723   }
1724
1725   # Delete previous alias if it exists.
1726   if (defined($prev_index)) {
1727      $self->delete_alias($prev_index, $alias) or return;
1728   }
1729
1730   return $self->put_alias($new_index, $alias);
1731}
1732
1733sub is_mapping_exists {
1734   my $self = shift;
1735   my ($index, $mapping) = @_;
1736
1737   $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1738   $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1739
1740   if (! $self->is_index_exists($index)) {
1741      return 0;
1742   }
1743
1744   my $all = $self->get_mappings($index) or return;
1745   for my $this_index (keys %$all) {
1746      my $mappings = $all->{$this_index}{mappings};
1747      for my $this_mapping (keys %$mappings) {
1748         if ($this_mapping eq $mapping) {
1749            return 1;
1750         }
1751      }
1752   }
1753
1754   return 0;
1755}
1756
1757#
1758# Search::Elasticsearch::Client::2_0::Direct::Indices
1759#
1760sub get_mappings {
1761   my $self = shift;
1762   my ($index, $type) = @_;
1763
1764   my $es = $self->_es;
1765   $self->brik_help_run_undef_arg('open', $es) or return;
1766   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1767   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1768
1769   my %args = (
1770      index => $index,
1771      type => $type,
1772   );
1773
1774   my $r;
1775   eval {
1776      $r = $es->indices->get_mapping(%args);
1777   };
1778   if ($@) {
1779      chomp($@);
1780      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1781         "[$@]");
1782   }
1783
1784   return $r;
1785}
1786
1787#
1788# Search::Elasticsearch::Client::2_0::Direct::Indices
1789#
1790sub create_index {
1791   my $self = shift;
1792   my ($index, $shards_count) = @_;
1793
1794   my $es = $self->_es;
1795   $self->brik_help_run_undef_arg('open', $es) or return;
1796   $self->brik_help_run_undef_arg('create_index', $index) or return;
1797         
1798   my $r;
1799   eval {
1800      $r = $es->indices->create(
1801         index => $index,
1802      );
1803   };
1804   if ($@) {
1805      chomp($@);
1806      return $self->log->error("create_index: create failed for index [$index]: [$@]");
1807   }
1808   
1809   return $r;
1810}
1811
1812#
1813# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1814#
1815sub create_index_with_mappings {
1816   my $self = shift;
1817   my ($index, $mappings) = @_;
1818
1819   my $es = $self->_es;
1820   $self->brik_help_run_undef_arg('open', $es) or return;
1821   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1822   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1823   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH')
1824      or return;
1825
1826   my $r;
1827   eval {
1828      $r = $es->indices->create(
1829         index => $index,
1830         body => {
1831            mappings => $mappings,
1832         },
1833      );
1834   };
1835   if ($@) {
1836      chomp($@);
1837      return $self->log->error("create_index_with_mappings: create failed for ".
1838         "index [$index]: [$@]");
1839   }
1840
1841   return $r;
1842}
1843
1844# GET http://localhost:9200/
1845sub info {
1846   my $self = shift;
1847   my ($nodes) = @_;
1848
1849   $nodes ||= $self->nodes;
1850   $self->brik_help_run_undef_arg('info', $nodes) or return;
1851   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1852   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1853
1854   my $first = $nodes->[0];
1855
1856   $self->get($first) or return;
1857
1858   return $self->content;
1859}
1860
1861sub version {
1862   my $self = shift;
1863   my ($nodes) = @_;
1864
1865   $nodes ||= $self->nodes;
1866   $self->brik_help_run_undef_arg('version', $nodes) or return;
1867   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1868   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1869
1870   my $first = $nodes->[0];
1871
1872   $self->get($first) or return;
1873   my $content = $self->content or return;
1874
1875   return $content->{version}{number};
1876}
1877
1878#
1879# Search::Elasticsearch::Client::2_0::Direct::Indices
1880#
1881sub get_templates {
1882   my $self = shift;
1883
1884   my $es = $self->_es;
1885   $self->brik_help_run_undef_arg('open', $es) or return;
1886
1887   my $r;
1888   eval {
1889      $r = $es->indices->get_template;
1890   };
1891   if ($@) {
1892      chomp($@);
1893      return $self->log->error("get_templates: failed: [$@]");
1894   }
1895
1896   return $r;
1897}
1898
1899sub list_templates {
1900   my $self = shift;
1901
1902   my $content = $self->get_templates or return;
1903
1904   return [ sort { $a cmp $b } keys %$content ];
1905}
1906
1907#
1908# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1909#
1910sub get_template {
1911   my $self = shift;
1912   my ($template) = @_;
1913
1914   my $es = $self->_es;
1915   $self->brik_help_run_undef_arg('open', $es) or return;
1916   $self->brik_help_run_undef_arg('get_template', $template) or return;
1917
1918   my $r;
1919   eval {
1920      $r = $es->indices->get_template(
1921         name => $template,
1922      );
1923   };
1924   if ($@) {
1925      chomp($@);
1926      return $self->log->error("get_template: template failed for name [$template]: [$@]");
1927   }
1928
1929   return $r;
1930}
1931
1932#
1933# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1934#
1935sub put_template {
1936   my $self = shift;
1937   my ($name, $template) = @_;
1938
1939   my $es = $self->_es;
1940   $self->brik_help_run_undef_arg('open', $es) or return;
1941   $self->brik_help_run_undef_arg('put_template', $name) or return;
1942   $self->brik_help_run_undef_arg('put_template', $template) or return;
1943   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH') or return;
1944
1945   my $r;
1946   eval {
1947      $r = $es->indices->put_template(
1948         name => $name,
1949         body => $template,
1950      );
1951   };
1952   if ($@) {
1953      chomp($@);
1954      return $self->log->error("put_template: template failed for name [$name]: [$@]");
1955   }
1956
1957   return $r;
1958}
1959
1960sub put_template_from_json_file {
1961   my $self = shift;
1962   my ($json_file) = @_;
1963
1964   my $es = $self->_es;
1965   $self->brik_help_run_undef_arg('open', $es) or return;
1966   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
1967   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file)
1968      or return;
1969
1970   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1971   my $data = $fj->read($json_file) or return;
1972
1973   if (! exists($data->{template}) && ! exists($data->{index_patterns})) {
1974      return $self->log->error("put_template_from_json_file: no template name found");
1975   }
1976
1977   my $name = $data->{template} || $data->{index_patterns};
1978
1979   return $self->put_template($name, $data);
1980}
1981
1982sub update_template_from_json_file {
1983   my $self = shift;
1984   my ($json_file) = @_;
1985
1986   my $es = $self->_es;
1987   $self->brik_help_run_undef_arg('open', $es) or return;
1988   $self->brik_help_run_undef_arg('update_template_from_json_file', $json_file) or return;
1989   $self->brik_help_run_file_not_found('update_template_from_json_file', $json_file)
1990      or return;
1991
1992   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1993   my $data = $fj->read($json_file) or return;
1994
1995   if (! exists($data->{template}) && ! exists($data->{index_patterns})) {
1996      return $self->log->error("put_template_from_json_file: no template name found");
1997   }
1998
1999   my $name = $data->{template} || $data->{index_patterns};
2000
2001   $self->delete_template($name);  # We ignore errors, template may not exist.
2002
2003   return $self->put_template($name, $data);
2004}
2005
2006#
2007# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2008# Search::Elasticsearch::Client::2_0::Direct::Indices
2009#
2010sub get_settings {
2011   my $self = shift;
2012   my ($indices, $names) = @_;
2013
2014   my $es = $self->_es;
2015   $self->brik_help_run_undef_arg('open', $es) or return;
2016
2017   my %args = ();
2018   if (defined($indices)) {
2019      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
2020      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
2021         or return;
2022      $args{index} = $indices;
2023   }
2024   if (defined($names)) {
2025      $self->brik_help_run_file_not_found('get_settings', $names) or return;
2026      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
2027         or return;
2028      $args{name} = $names;
2029   }
2030
2031   my $r;
2032   eval {
2033      $r = $es->indices->get_settings(%args);
2034   };
2035   if ($@) {
2036      chomp($@);
2037      return $self->log->error("get_settings: failed: [$@]");
2038   }
2039
2040   return $r;
2041}
2042
2043#
2044# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2045# Search::Elasticsearch::Client::2_0::Direct::Indices
2046#
2047# Example:
2048#
2049# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
2050#
2051# XXX: should be renamed to put_index_settings
2052#
2053sub put_settings {
2054   my $self = shift;
2055   my ($settings, $indices) = @_;
2056
2057   my $es = $self->_es;
2058   $self->brik_help_run_undef_arg('open', $es) or return;
2059   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
2060   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
2061
2062   my %args = (
2063      body => $settings,
2064   );
2065   if (defined($indices)) {
2066      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
2067      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
2068         or return;
2069      $args{index} = $indices;
2070   }
2071
2072   my $r;
2073   eval {
2074      $r = $es->indices->put_settings(%args);
2075   };
2076   if ($@) {
2077      chomp($@);
2078      return $self->log->error("put_settings: failed: [$@]");
2079   }
2080
2081   return $r;
2082}
2083
2084sub set_index_readonly {
2085   my $self = shift;
2086   my ($indices, $bool) = @_;
2087
2088   my $es = $self->_es;
2089   $self->brik_help_run_undef_arg('open', $es) or return;
2090   $self->brik_help_run_undef_arg('set_index_readonly', $indices) or return;
2091   $self->brik_help_run_invalid_arg('set_index_readonly', $indices, 'ARRAY', 'SCALAR')
2092      or return;
2093
2094   if (! defined($bool)) {
2095      $bool = 'true';
2096   }
2097   else {
2098      $bool = $bool ? 'true' : 'false';
2099   }
2100
2101   my $settings = {
2102      'blocks.read_only' => $bool,
2103      'blocks.read_only_allow_delete' => 'true',
2104   };
2105
2106   return $self->put_settings($settings, $indices);
2107}
2108
2109#
2110# curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'
2111# PUT synscan-2018-05/_settings
2112# {
2113#  "index": {
2114#    "blocks":{
2115#      "read_only":"false",
2116#      "read_only_allow_delete":"true"
2117#    }
2118#  }
2119#}
2120#
2121#
2122# If it fails with the following error:
2123#
2124# [2018-09-12T13:38:40,012][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})
2125#
2126# Use Kibana dev console and copy/paste both requests:
2127#
2128# PUT _settings
2129# {
2130#    "index": {
2131#       "blocks": {
2132#          "read_only_allow_delete": "false"
2133#       }
2134#    }
2135# }
2136#   
2137# PUT _all/_settings
2138# {
2139#    "index": {
2140#       "blocks": {
2141#          "read_only_allow_delete": "false"
2142#       }
2143#    }
2144# }
2145#
2146sub reset_index_readonly {
2147   my $self = shift;
2148   my ($indices) = @_;
2149
2150   $indices ||= '*';
2151   my $es = $self->_es;
2152   $self->brik_help_run_undef_arg('open', $es) or return;
2153   $self->brik_help_run_invalid_arg('reset_index_readonly', $indices, 'ARRAY', 'SCALAR')
2154      or return;
2155
2156   my $settings = {
2157      'blocks.read_only_allow_delete' => 'false',
2158   };
2159
2160   my $r = $self->put_settings($settings);
2161   $self->log->info(Data::Dumper::Dumper($r));
2162
2163   $r = $self->put_settings($settings, $indices);
2164   $self->log->info(Data::Dumper::Dumper($r));
2165
2166   return 1;
2167}
2168
2169sub list_index_readonly {
2170   my $self = shift;
2171
2172   my $es = $self->_es;
2173   $self->brik_help_run_undef_arg('open', $es) or return;
2174
2175   my $list = $self->list_indices or return;
2176
2177   my @indices = ();
2178   for my $this (@$list) {
2179      my $ro = $self->get_index_readonly($this) or next;
2180      if (defined($ro->{index}{provided_name})) {
2181         push @indices, $ro->{index}{provided_name};
2182      }
2183   }
2184
2185   return \@indices;
2186}
2187
2188sub set_index_number_of_replicas {
2189   my $self = shift;
2190   my ($indices, $number) = @_;
2191
2192   my $es = $self->_es;
2193   $self->brik_help_run_undef_arg('open', $es) or return;
2194   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
2195   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2196      or return;
2197
2198   my $settings = { number_of_replicas => $number };
2199
2200   return $self->put_settings($settings, $indices);
2201}
2202
2203sub set_index_refresh_interval {
2204   my $self = shift;
2205   my ($indices, $number) = @_;
2206
2207   my $es = $self->_es;
2208   $self->brik_help_run_undef_arg('open', $es) or return;
2209   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
2210   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2211      or return;
2212
2213   # If there is a meaningful value not postfixed with a unity,
2214   # we default to add a `s' for a number of seconds.
2215   if ($number =~ /^\d+$/ && $number > 0) {
2216      $number .= 's';
2217   }
2218
2219   my $settings = { refresh_interval => $number };
2220
2221   return $self->put_settings($settings, $indices);
2222}
2223
2224sub get_index_settings {
2225   my $self = shift;
2226   my ($indices) = @_;
2227
2228   my $es = $self->_es;
2229   $self->brik_help_run_undef_arg('open', $es) or return;
2230   $self->brik_help_run_undef_arg('get_index_settings', $indices) or return;
2231   $self->brik_help_run_invalid_arg('get_index_settings', $indices, 'ARRAY', 'SCALAR')
2232      or return;
2233
2234   my $settings = $self->get_settings($indices);
2235
2236   my %indices = ();
2237   for (keys %$settings) {
2238      $indices{$_} = $settings->{$_}{settings};
2239   }
2240
2241   return \%indices;
2242}
2243
2244sub get_index_readonly {
2245   my $self = shift;
2246   my ($indices) = @_;
2247
2248   my $es = $self->_es;
2249   $self->brik_help_run_undef_arg('open', $es) or return;
2250   $self->brik_help_run_undef_arg('get_index_readonly', $indices) or return;
2251   $self->brik_help_run_invalid_arg('get_index_readonly', $indices, 'ARRAY', 'SCALAR')
2252      or return;
2253
2254   my $settings = $self->get_settings($indices);
2255
2256   my %indices = ();
2257   for (keys %$settings) {
2258      #$indices{$_} = $settings->{$_}{settings}{index}{'blocks_write'};
2259      $indices{$_} = $settings->{$_}{settings};
2260   }
2261
2262   return \%indices;
2263}
2264
2265sub get_index_number_of_replicas {
2266   my $self = shift;
2267   my ($indices) = @_;
2268
2269   my $es = $self->_es;
2270   $self->brik_help_run_undef_arg('open', $es) or return;
2271   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
2272   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2273      or return;
2274
2275   my $settings = $self->get_settings($indices);
2276
2277   my %indices = ();
2278   for (keys %$settings) {
2279      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
2280   }
2281
2282   return \%indices;
2283}
2284
2285sub get_index_refresh_interval {
2286   my $self = shift;
2287   my ($indices, $number) = @_;
2288
2289   my $es = $self->_es;
2290   $self->brik_help_run_undef_arg('open', $es) or return;
2291   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
2292   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2293      or return;
2294
2295   my $settings = $self->get_settings($indices);
2296
2297   my %indices = ();
2298   for (keys %$settings) {
2299      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
2300   }
2301
2302   return \%indices;
2303}
2304
2305sub get_index_number_of_shards {
2306   my $self = shift;
2307   my ($indices, $number) = @_;
2308
2309   my $es = $self->_es;
2310   $self->brik_help_run_undef_arg('open', $es) or return;
2311   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
2312   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
2313      or return;
2314
2315   my $settings = $self->get_settings($indices);
2316
2317   my %indices = ();
2318   for (keys %$settings) {
2319      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
2320   }
2321
2322   return \%indices;
2323}
2324
2325#
2326# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2327#
2328sub delete_template {
2329   my $self = shift;
2330   my ($name) = @_;
2331
2332   my $es = $self->_es;
2333   $self->brik_help_run_undef_arg('open', $es) or return;
2334   $self->brik_help_run_undef_arg('delete_template', $name) or return;
2335
2336   my $r;
2337   eval {
2338      $r = $es->indices->delete_template(
2339         name => $name,
2340      );
2341   };
2342   if ($@) {
2343      chomp($@);
2344      return $self->log->error("delete_template: failed for name [$name]: [$@]");
2345   }
2346
2347   return $r;
2348}
2349
2350#
2351# Return a boolean to state for index existence
2352#
2353sub is_index_exists {
2354   my $self = shift;
2355   my ($index) = @_;
2356
2357   my $es = $self->_es;
2358   $self->brik_help_run_undef_arg('open', $es) or return;
2359   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
2360
2361   my $r;
2362   eval {
2363      $r = $es->indices->exists(
2364         index => $index,
2365      );
2366   };
2367   if ($@) {
2368      chomp($@);
2369      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
2370   }
2371
2372   return $r ? 1 : 0;
2373}
2374
2375#
2376# Return a boolean to state for index with type existence
2377#
2378sub is_type_exists {
2379   my $self = shift;
2380   my ($index, $type) = @_;
2381
2382   my $es = $self->_es;
2383   $self->brik_help_run_undef_arg('open', $es) or return;
2384   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
2385   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
2386
2387   my $r;
2388   eval {
2389      $r = $es->indices->exists_type(
2390         index => $index,
2391         type => $type,
2392      );
2393   };
2394   if ($@) {
2395      chomp($@);
2396      return $self->log->error("is_type_exists: failed for index [$index] and ".
2397         "type [$type]: [$@]");
2398   }
2399
2400   return $r ? 1 : 0;
2401}
2402
2403#
2404# Return a boolean to state for document existence
2405#
2406sub is_document_exists {
2407   my $self = shift;
2408   my ($index, $type, $document) = @_;
2409
2410   my $es = $self->_es;
2411   $self->brik_help_run_undef_arg('open', $es) or return;
2412   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
2413   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
2414   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
2415   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
2416
2417   my $r;
2418   eval {
2419      $r = $es->exists(
2420         index => $index,
2421         type => $type,
2422         %$document,
2423      );
2424   };
2425   if ($@) {
2426      chomp($@);
2427      return $self->log->error("is_document_exists: failed for index [$index] and ".
2428         "type [$type]: [$@]");
2429   }
2430
2431   return $r ? 1 : 0;
2432}
2433
2434sub parse_error_string {
2435   my $self = shift;
2436   my ($string) = @_;
2437
2438   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
2439
2440   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
2441
2442   my ($class, $node, $code, $message, $dump) = $string =~
2443      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
2444
2445   if (defined($dump) && length($dump)) {
2446      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
2447      $dump = $sd->decode($dump);
2448   }
2449
2450   # Sanity check
2451   if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
2452   &&  defined($dump) && ref($dump) eq 'HASH') {
2453      return {
2454         class => $class,
2455         node => $node,
2456         code => $code,
2457         message => $message,
2458         dump => $dump,
2459      };
2460   }
2461
2462   # Were not able to decode, we return as-is.
2463   return {
2464      message => $string,
2465   };
2466}
2467
2468#
2469# Refresh an index to receive latest additions
2470#
2471# Search::Elasticsearch::Client::5_0::Direct::Indices
2472# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
2473#
2474sub refresh_index {
2475   my $self = shift;
2476   my ($index) = @_;
2477
2478   my $es = $self->_es;
2479   $self->brik_help_run_undef_arg('open', $es) or return;
2480   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
2481
2482   my $try = $self->try;
2483
2484RETRY:
2485
2486   my $r;
2487   eval {
2488      $r = $es->indices->refresh(
2489         index => $index,
2490      );
2491   };
2492   if ($@) {
2493      if (--$try == 0) {
2494         chomp($@);
2495         my $p = $self->parse_error_string($@);
2496         if (defined($p) && exists($p->{class})) {
2497            my $class = $p->{class};
2498            my $code = $p->{code};
2499            my $node = $p->{node};
2500            return $self->log->error("refresh_index: failed for index [$index] ".
2501               "after [$try] tries with error [$class] code [$code] for node [$node]");
2502         }
2503         else {
2504            return $self->log->error("refresh_index: failed for index [$index] ".
2505               "after [$try]: [$@]");
2506         }
2507      }
2508      sleep 60;
2509      goto RETRY;
2510   }
2511
2512   return $r;
2513}
2514
2515sub export_as_csv {
2516   my $self = shift;
2517   my ($index, $size, $cb) = @_;
2518
2519   $size ||= 10_000;
2520   my $es = $self->_es;
2521   $self->brik_help_run_undef_arg('open', $es) or return;
2522   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
2523   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
2524
2525   my $max = $self->max;
2526   my $datadir = $self->datadir;
2527
2528   $self->log->debug("export_as_csv: selecting scroll Command...");
2529
2530   my $scroll;
2531   my $version = $self->version or return;
2532   if ($version lt "5.0.0") {
2533      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2534   }
2535   else {
2536      $scroll = $self->open_scroll($index, $size) or return;
2537   }
2538
2539   $self->log->debug("export_as_csv: selecting scroll Command...OK.");
2540
2541   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2542
2543   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2544   $fc->separator(',');
2545   $fc->escape('\\');
2546   $fc->append(1);
2547   $fc->first_line_is_header(0);
2548   $fc->write_header(1);
2549   $fc->use_quoting(1);
2550   if (defined($self->csv_header)) {
2551      my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
2552      $fc->header($sorted);
2553   }
2554   if (defined($self->csv_encoded_fields)) {
2555      $fc->encoded_fields($self->csv_encoded_fields);
2556   }
2557   if (defined($self->csv_object_fields)) {
2558      $fc->object_fields($self->csv_object_fields);
2559   }
2560
2561   my $csv_header = $fc->header;
2562
2563   my $total = $self->total_scroll;
2564   $self->log->info("export_as_csv: total [$total] for index [$index]");
2565
2566   my %types = ();
2567   my $read = 0;
2568   my $skipped = 0;
2569   my $exported = 0;
2570   my $start = time();
2571   my $done = $datadir."/$index.exported";
2572   my $start_time = time();
2573   my %chunk = ();
2574   while (my $next = $self->next_scroll(10000)) {
2575      for my $this (@$next) {
2576         $read++;
2577
2578         if (defined($cb)) {
2579            $this = $cb->($this);
2580            if (! defined($this)) {
2581               $self->log->error("export_as_csv: callback failed for index [$index] ".
2582                  "at read [$read], skipping single entry");
2583               $skipped++;
2584               next;
2585            }
2586         }
2587
2588         my $id = $this->{_id};
2589         my $doc = $this->{_source};
2590         my $type = $this->{_type} || 'doc';  # Prepare for when types will be removed from ES
2591         if (! exists($types{$type})) {
2592            # If not given, we guess the CSV fields to use.
2593            if (! defined($csv_header)) {
2594               my $fields = $self->list_index_fields($index, $type) or return;
2595               #$types{$type}{header} = [ '_id', sort { $a cmp $b } keys %$doc ];
2596               $types{$type}{header} = [ '_id', @$fields ];
2597            }
2598            else {
2599               $types{$type}{header} = [ '_id', @$csv_header ];
2600            }
2601
2602            $types{$type}{output} = $datadir."/$index:$type.csv";
2603
2604            # Verify it has not been exported yet
2605            if (-f $done) {
2606               return $self->log->error("export_as_csv: export already done for index ".
2607                  "[$index]");
2608            }
2609
2610            $self->log->info("export_as_csv: exporting to file [".$types{$type}{output}.
2611               "] for type [$type], using chunk size of [$size]");
2612         }
2613
2614         my $h = { _id => $id };
2615
2616         for my $k (keys %$doc) {
2617            $h->{$k} = $doc->{$k};
2618         }
2619
2620         $fc->header($types{$type}{header});
2621
2622         push @{$chunk{$type}}, $h;
2623         if (@{$chunk{$type}} > 999) {
2624            my $r = $fc->write($chunk{$type}, $types{$type}{output});
2625            if (!defined($r)) {
2626               $self->log->warning("export_as_csv: unable to process entry, skipping");
2627               $skipped++;
2628               next;
2629            }
2630            $chunk{$type} = [];
2631         }
2632
2633         # Log a status sometimes.
2634         if (! (++$exported % 100_000)) {
2635            my $now = time();
2636            my $perc = sprintf("%.02f", $exported / $total * 100);
2637            $self->log->info("export_as_csv: fetched [$exported/$total] ($perc%) ".
2638               "elements in ".($now - $start)." second(s) from index [$index]");
2639            $start = time();
2640         }
2641
2642         # Limit export to specified maximum
2643         if ($max > 0 && $exported >= $max) {
2644            $self->log->info("export_as_csv: max export reached [$exported] for index ".
2645               "[$index], stopping");
2646            last;
2647         }
2648      }
2649   }
2650
2651   # Process remaining data waiting to be written and build output file list
2652   my %files = ();
2653   for my $type (keys %types) {
2654      if (@{$chunk{$type}} > 0) {
2655         $fc->write($chunk{$type}, $types{$type}{output});
2656         $files{$types{$type}{output}}++;
2657      }
2658   }
2659
2660   $self->close_scroll;
2661
2662   my $stop_time = time();
2663   my $duration = $stop_time - $start_time;
2664   my $eps = $exported;
2665   if ($duration > 0) {
2666      $eps = $exported / $duration;
2667   }
2668
2669   my $result = {
2670      read => $read,
2671      exported => $exported,
2672      skipped => $read - $exported,
2673      total_count => $total,
2674      complete => ($exported == $total) ? 1 : 0,
2675      duration => $duration,
2676      eps => $eps, 
2677      files => [ sort { $a cmp $b } keys %files ],
2678   };
2679
2680   # Say the file has been processed, and put resulting stats.
2681   $fd->write($result, $done) or return;
2682
2683   $self->log->info("export_as_csv: done.");
2684
2685   return $result;
2686}
2687
2688#
2689# Optimization instructions:
2690# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
2691#
2692sub import_from_csv {
2693   my $self = shift;
2694   my ($input_csv, $index, $type, $hash, $cb) = @_;
2695
2696   my $es = $self->_es;
2697   $self->brik_help_run_undef_arg('open', $es) or return;
2698   $self->brik_help_run_undef_arg('import_from_csv', $input_csv) or return;
2699   $self->brik_help_run_file_not_found('import_from_csv', $input_csv) or return;
2700
2701   # If index and/or types are not defined, we try to get them from input filename
2702   if (! defined($index) || ! defined($type)) {
2703      # Example: index-DATE:type.csv
2704      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
2705         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
2706         $index ||= $this_index;
2707         $type ||= $this_type;
2708      }
2709   }
2710
2711   # Verify it has not been indexed yet
2712   my $done = "$input_csv.imported";
2713   if (-f $done) {
2714      $self->log->info("import_from_csv: import already done for file [$input_csv]");
2715      return 0;
2716   }
2717
2718   # And default to Attributes if guess failed.
2719   $index ||= $self->index;
2720   $type ||= $self->type;
2721   $self->brik_help_set_undef_arg('index', $index) or return;
2722   $self->brik_help_set_undef_arg('type', $type) or return;
2723
2724   if ($index eq '*') {
2725      return $self->log->error("import_from_csv: cannot import to invalid index [$index]");
2726   }
2727   if ($type eq '*') {
2728      return $self->log->error("import_from_csv: cannot import to invalid type [$type]");
2729   }
2730
2731   $self->log->debug("input [$input_csv]");
2732   $self->log->debug("index [$index]");
2733   $self->log->debug("type [$type]");
2734
2735   my $count_before = 0;
2736   if ($self->is_index_exists($index)) {
2737      $count_before = $self->count($index, $type);
2738      if (! defined($count_before)) {
2739         return;
2740      }
2741      $self->log->info("import_from_csv: current index [$index] count is ".
2742         "[$count_before]");
2743   }
2744
2745   my $max = $self->max;
2746
2747   $self->open_bulk_mode($index, $type) or return;
2748
2749   $self->log->info("import_from_csv: importing file [$input_csv] to index [$index] ".
2750      "with type [$type]");
2751
2752   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2753
2754   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2755   $fc->separator(',');
2756   $fc->escape('\\');
2757   $fc->first_line_is_header(1);
2758   $fc->encoded_fields($self->csv_encoded_fields);
2759   $fc->object_fields($self->csv_object_fields);
2760
2761   my $refresh_interval;
2762   my $number_of_replicas;
2763   my $start = time();
2764   my $speed_settings = {};
2765   my $imported = 0;
2766   my $first = 1;
2767   my $read = 0;
2768   my $skipped_chunks = 0;
2769   my $start_time = time();
2770   while (my $this = $fc->read_next($input_csv)) {
2771      $read++;
2772
2773      my $h = {};
2774      my $id = $this->{_id};
2775      delete $this->{_id};
2776      for my $k (keys %$this) {
2777         my $value = $this->{$k};
2778         # We keep only fields when they have a value.
2779         # No need to index data that is empty.
2780         if (defined($value) && length($value)) {
2781            $h->{$k} = $value;
2782         }
2783      }
2784
2785      if (defined($cb)) {
2786         $h = $cb->($h);
2787         if (! defined($h)) {
2788            $self->log->error("import_from_csv: callback failed for index [$index] ".
2789               "at read [$read], skipping single entry");
2790            $skipped_chunks++;
2791            next;
2792         }
2793      }
2794
2795      #$self->log->info(Data::Dumper::Dumper($h));
2796
2797      my $r;
2798      eval {
2799         $r = $self->index_bulk($h, $index, $type, $hash, $id);
2800      };
2801      if ($@) {
2802         chomp($@);
2803         $self->log->warning("import_from_csv: error [$@]");
2804      }
2805      if (! defined($r)) {
2806         $self->log->error("import_from_csv: bulk processing failed for index [$index] ".
2807            "at read [$read], skipping chunk");
2808         $skipped_chunks++;
2809         next;
2810      }
2811
2812      # Gather index settings, and set values for speed.
2813      # We don't do it earlier, cause we need index to be created,
2814      # and it should have been done from index_bulk Command.
2815      if ($first && $self->is_index_exists($index)) {
2816         # Save current values so we can restore them at the end of Command.
2817         # We ignore errors here, this is non-blocking for indexing.
2818         $refresh_interval = $self->get_index_refresh_interval($index);
2819         $refresh_interval = $refresh_interval->{$index};
2820         $number_of_replicas = $self->get_index_number_of_replicas($index);
2821         $number_of_replicas = $number_of_replicas->{$index};
2822         if ($self->use_indexing_optimizations) {
2823            $self->set_index_number_of_replicas($index, 0);
2824         }
2825         $self->set_index_refresh_interval($index, -1);
2826         $first = 0;
2827      }
2828
2829      # Log a status sometimes.
2830      if (! (++$imported % 100_000)) {
2831         my $now = time();
2832         $self->log->info("import_from_csv: imported [$imported] entries in ".
2833            ($now - $start)." second(s) to index [$index]");
2834         $start = time();
2835      }
2836
2837      # Limit import to specified maximum
2838      if ($max > 0 && $imported >= $max) {
2839         $self->log->info("import_from_csv: max import reached [$imported] for ".
2840            "index [$index], stopping");
2841         last;
2842      }
2843   }
2844
2845   $self->bulk_flush;
2846
2847   my $stop_time = time();
2848   my $duration = $stop_time - $start_time;
2849   my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
2850
2851   $self->refresh_index($index);
2852
2853   my $count_current = $self->count($index, $type) or return;
2854   $self->log->info("import_from_csv: after index [$index] count is [$count_current] ".
2855      "at EPS [$eps]");
2856
2857   my $skipped = 0;
2858   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
2859   if ($complete) {  # If complete, import has been retried, and everything is now ok.
2860      $imported = $read;
2861   }
2862   else {
2863      $skipped = $read - ($count_current - $count_before);
2864   }
2865
2866   my $result = {
2867      read => $read,
2868      imported => $imported,
2869      skipped => $skipped,
2870      previous_count => $count_before,
2871      current_count => $count_current,
2872      complete => $complete,
2873      duration => $duration,
2874      eps => $eps,
2875   };
2876
2877   # Say the file has been processed, and put resulting stats.
2878   $fd->write($result, $done) or return;
2879
2880   # Restore previous settings, if any
2881   if (defined($refresh_interval)) {
2882      $self->set_index_refresh_interval($index, $refresh_interval);
2883   }
2884   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
2885      $self->set_index_number_of_replicas($index, $number_of_replicas);
2886   }
2887
2888   return $result;
2889}
2890
2891#
2892# Same as import_from_csv Command but in worker mode for speed.
2893#
2894sub import_from_csv_worker {
2895   my $self = shift;
2896   my ($input_csv, $index, $type, $hash, $cb) = @_;
2897
2898   my $es = $self->_es;
2899   $self->brik_help_run_undef_arg('open', $es) or return;
2900   $self->brik_help_run_undef_arg('import_from_csv_worker', $input_csv) or return;
2901   $self->brik_help_run_file_not_found('import_from_csv_worker', $input_csv) or return;
2902
2903   # If index and/or types are not defined, we try to get them from input filename
2904   if (! defined($index) || ! defined($type)) {
2905      # Example: index-DATE:type.csv
2906      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
2907         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
2908         $index ||= $this_index;
2909         $type ||= $this_type;
2910      }
2911   }
2912
2913   # Verify it has not been indexed yet
2914   my $done = "$input_csv.imported";
2915   if (-f $done) {
2916      $self->log->info("import_from_csv_worker: import already done for ".
2917         "file [$input_csv]");
2918      return 0;
2919   }
2920
2921   # And default to Attributes if guess failed.
2922   $index ||= $self->index;
2923   $type ||= $self->type;
2924   $self->brik_help_set_undef_arg('index', $index) or return;
2925   $self->brik_help_set_undef_arg('type', $type) or return;
2926
2927   if ($index eq '*') {
2928      return $self->log->error("import_from_csv_worker: cannot import to invalid ".
2929         "index [$index]");
2930   }
2931   if ($type eq '*') {
2932      return $self->log->error("import_from_csv_worker: cannot import to invalid ".
2933         "type [$type]");
2934   }
2935
2936   $self->log->debug("input [$input_csv]");
2937   $self->log->debug("index [$index]");
2938   $self->log->debug("type [$type]");
2939
2940   my $count_before = 0;
2941   if ($self->is_index_exists($index)) {
2942      $count_before = $self->count($index, $type);
2943      if (! defined($count_before)) {
2944         return;
2945      }
2946      $self->log->info("import_from_csv_worker: current index [$index] count is ".
2947         "[$count_before]");
2948   }
2949
2950   my $max = $self->max;
2951
2952   $self->open_bulk_mode($index, $type) or return;
2953
2954   #my $batch = undef;
2955   my $batch = 10_000;
2956
2957   $self->log->info("import_from_csv_worker: importing file [$input_csv] to ".
2958      "index [$index] with type [$type] and batch [$batch]");
2959
2960   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2961
2962   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2963   $fc->separator(',');
2964   $fc->escape('\\');
2965   $fc->first_line_is_header(1);
2966   $fc->encoded_fields($self->csv_encoded_fields);
2967   $fc->object_fields($self->csv_object_fields);
2968
2969   my $wp = Metabrik::Worker::Parallel->new_from_brik_init($self) or return;
2970   $wp->pool_size(2);
2971
2972   $wp->create_manager or return;
2973
2974   my $refresh_interval;
2975   my $number_of_replicas;
2976   my $start = time();
2977   my $speed_settings = {};
2978   my $imported = 0;
2979   my $first = 1;
2980   my $read = 0;
2981   my $skipped_chunks = 0;
2982   my $start_time = time();
2983   while (my $list = $fc->read_next($input_csv, $batch)) {
2984
2985      $wp->start(sub {
2986         my @list = ();
2987         for my $this (@$list) {
2988            $read++;
2989
2990            my $h = {};
2991            my $id = $this->{_id};
2992            delete $this->{_id};
2993            for my $k (keys %$this) {
2994               my $value = $this->{$k};
2995               # We keep only fields when they have a value.
2996               # No need to index data that is empty.
2997               if (defined($value) && length($value)) {
2998                  $h->{$k} = $value;
2999               }
3000            }
3001
3002            if (defined($cb)) {
3003               $h = $cb->($h);
3004               if (! defined($h)) {
3005                  $self->log->error("import_from_csv_worker: callback failed for ".
3006                     "index [$index] at read [$read], skipping single entry");
3007                  $skipped_chunks++;
3008                  next;
3009               }
3010            }
3011
3012            push @list, $h;
3013         }
3014
3015         my $r;
3016         eval {
3017            $r = $self->index_bulk_from_list(\@list, $index, $type, $hash);
3018         };
3019         if ($@) {
3020            chomp($@);
3021            $self->log->warning("import_from_csv_worker: error [$@]");
3022         }
3023         if (! defined($r)) {
3024            $self->log->error("import_from_csv_worker: bulk processing failed for ".
3025               "index [$index] at read [$read], skipping chunk");
3026            $skipped_chunks++;
3027            next;
3028         }
3029
3030         # Log a status sometimes.
3031         if (! ($imported % 10_000)) {
3032            my $now = time();
3033            my $diff = sprintf("%.02f", $now - $start);
3034            my $eps = sprintf("%.02f", $imported / $diff);
3035            $self->log->info("import_from_csv_worker: imported [$imported] entries ".
3036               "in [$diff] second(s) to index [$index] at EPS [$eps]");
3037            $start = time();
3038         }
3039
3040         exit(0);
3041      });
3042
3043      # Gather index settings, and set values for speed.
3044      # We don't do it earlier, cause we need index to be created,
3045      # and it should have been done from index_bulk Command.
3046      if ($first && $self->is_index_exists($index)) {
3047         # Save current values so we can restore them at the end of Command.
3048         # We ignore errors here, this is non-blocking for indexing.
3049         $refresh_interval = $self->get_index_refresh_interval($index);
3050         $refresh_interval = $refresh_interval->{$index};
3051         $number_of_replicas = $self->get_index_number_of_replicas($index);
3052         $number_of_replicas = $number_of_replicas->{$index};
3053         if ($self->use_indexing_optimizations) {
3054            $self->set_index_number_of_replicas($index, 0);
3055         }
3056         $self->set_index_refresh_interval($index, -1);
3057         $first = 0;
3058      }
3059
3060      # Log a status sometimes.
3061      #$imported += @$list;
3062      #if (! ($imported % 10_000)) {
3063         #my $now = time();
3064         #my $diff = sprintf("%.02f", $now - $start);
3065         #my $eps = sprintf("%.02f", 10_000 / $diff);
3066         #$self->log->info("import_from_csv_worker: imported [$imported] entries ".
3067            #"in [$diff] second(s) to index [$index] at EPS [$eps]");
3068         #$start = time();
3069      #}
3070
3071      # Limit import to specified maximum
3072      if ($max > 0 && $imported >= $max) {
3073         $self->log->info("import_from_csv_worker: max import reached [$imported] for ".
3074            "index [$index], stopping");
3075         last;
3076      }
3077
3078      last if (@$list < $batch);
3079
3080      $imported += @$list;
3081   }
3082
3083   $wp->stop;
3084
3085   $self->bulk_flush;
3086
3087   my $stop_time = time();
3088   my $duration = $stop_time - $start_time;
3089   my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3090
3091   $self->refresh_index($index);
3092
3093   my $count_current = $self->count($index, $type) or return;
3094   $self->log->info("import_from_csv_worker: after index [$index] count ".
3095      "is [$count_current] at EPS [$eps]");
3096
3097   my $skipped = 0;
3098   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3099   if ($complete) {  # If complete, import has been retried, and everything is now ok.
3100      $imported = $read;
3101   }
3102   else {
3103      $skipped = $read - ($count_current - $count_before);
3104   }
3105
3106   my $result = {
3107      read => $read,
3108      imported => $imported,
3109      skipped => $skipped,
3110      previous_count => $count_before,
3111      current_count => $count_current,
3112      complete => $complete,
3113      duration => $duration,
3114      eps => $eps,
3115   };
3116
3117   # Say the file has been processed, and put resulting stats.
3118   $fd->write($result, $done) or return;
3119
3120   # Restore previous settings, if any
3121   if (defined($refresh_interval)) {
3122      $self->set_index_refresh_interval($index, $refresh_interval);
3123   }
3124   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3125      $self->set_index_number_of_replicas($index, $number_of_replicas);
3126   }
3127
3128   return $result;
3129}
3130
3131#
3132# http://localhost:9200/_nodes/stats/process?pretty
3133#
3134# Search::Elasticsearch::Client::2_0::Direct::Nodes
3135#
3136sub get_stats_process {
3137   my $self = shift;
3138
3139   my $es = $self->_es;
3140   $self->brik_help_run_undef_arg('open', $es) or return;
3141
3142   my $r;
3143   eval {
3144      $r = $es->nodes->stats(
3145         metric => [ qw(process) ],
3146      );
3147   };
3148   if ($@) {
3149      chomp($@);
3150      return $self->log->error("get_stats_process: failed: [$@]");
3151   }
3152
3153   return $r;
3154}
3155
3156#
3157# curl http://localhost:9200/_nodes/process?pretty
3158#
3159# Search::Elasticsearch::Client::2_0::Direct::Nodes
3160#
3161sub get_process {
3162   my $self = shift;
3163
3164   my $es = $self->_es;
3165   $self->brik_help_run_undef_arg('open', $es) or return;
3166
3167   my $r;
3168   eval {
3169      $r = $es->nodes->info(
3170         metric => [ qw(process) ],
3171      );
3172   };
3173   if ($@) {
3174      chomp($@);
3175      return $self->log->error("get_process: failed: [$@]");
3176   }
3177
3178   return $r;
3179}
3180
3181#
3182# Search::Elasticsearch::Client::2_0::Direct::Cluster
3183#
3184sub get_cluster_state {
3185   my $self = shift;
3186
3187   my $es = $self->_es;
3188   $self->brik_help_run_undef_arg('open', $es) or return;
3189
3190   my $r;
3191   eval {
3192      $r = $es->cluster->state;
3193   };
3194   if ($@) {
3195      chomp($@);
3196      return $self->log->error("get_cluster_state: failed: [$@]");
3197   }
3198
3199   return $r;
3200}
3201
3202#
3203# Search::Elasticsearch::Client::2_0::Direct::Cluster
3204#
3205sub get_cluster_health {
3206   my $self = shift;
3207
3208   my $es = $self->_es;
3209   $self->brik_help_run_undef_arg('open', $es) or return;
3210
3211   my $r;
3212   eval {
3213      $r = $es->cluster->health;
3214   };
3215   if ($@) {
3216      chomp($@);
3217      return $self->log->error("get_cluster_health: failed: [$@]");
3218   }
3219
3220   return $r;
3221}
3222
3223#
3224# Search::Elasticsearch::Client::2_0::Direct::Cluster
3225#
3226sub get_cluster_settings {
3227   my $self = shift;
3228
3229   my $es = $self->_es;
3230   $self->brik_help_run_undef_arg('open', $es) or return;
3231
3232   my $r;
3233   eval {
3234      $r = $es->cluster->get_settings;
3235   };
3236   if ($@) {
3237      chomp($@);
3238      return $self->log->error("get_cluster_settings: failed: [$@]");
3239   }
3240
3241   return $r;
3242}
3243
3244#
3245# Search::Elasticsearch::Client::2_0::Direct::Cluster
3246#
3247sub put_cluster_settings {
3248   my $self = shift;
3249   my ($settings) = @_;
3250
3251   my $es = $self->_es;
3252   $self->brik_help_run_undef_arg('open', $es) or return;
3253   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
3254   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
3255
3256   my %args = (
3257      body => $settings,
3258   );
3259
3260   my $r;
3261   eval {
3262      $r = $es->cluster->put_settings(%args);
3263   };
3264   if ($@) {
3265      chomp($@);
3266      return $self->log->error("put_cluster_settings: failed: [$@]");
3267   }
3268
3269   return $r;
3270}
3271
3272sub count_green_indices {
3273   my $self = shift;
3274
3275   my $get = $self->show_indices or return;
3276
3277   my $count = 0;
3278   for (@$get) {
3279      if (/^\s*green\s+/) {
3280         $count++;
3281      }
3282   }
3283
3284   return $count;
3285}
3286
3287sub count_yellow_indices {
3288   my $self = shift;
3289
3290   my $get = $self->show_indices or return;
3291
3292   my $count = 0;
3293   for (@$get) {
3294      if (/^\s*yellow\s+/) {
3295         $count++;
3296      }
3297   }
3298
3299   return $count;
3300}
3301
3302sub count_red_indices {
3303   my $self = shift;
3304
3305   my $get = $self->show_indices or return;
3306
3307   my $count = 0;
3308   for (@$get) {
3309      if (/^\s*red\s+/) {
3310         $count++;
3311      }
3312   }
3313
3314   return $count;
3315}
3316
3317sub count_indices {
3318   my $self = shift;
3319
3320   my $get = $self->show_indices or return;
3321
3322   return scalar @$get;
3323}
3324
3325sub list_indices_status {
3326   my $self = shift;
3327
3328   my $get = $self->show_indices or return;
3329
3330   my $count_red = 0;
3331   my $count_yellow = 0;
3332   my $count_green = 0;
3333   for (@$get) {
3334      if (/^\s*red\s+/) {
3335         $count_red++;
3336      }
3337      elsif (/^\s*yellow\s+/) {
3338         $count_yellow++;
3339      }
3340      elsif (/^\s*green\s+/) {
3341         $count_green++;
3342      }
3343   }
3344
3345   return {
3346      red => $count_red,
3347      yellow => $count_yellow,
3348      green => $count_green,
3349   };
3350}
3351
3352sub count_shards {
3353   my $self = shift;
3354
3355   my $indices = $self->get_indices or return;
3356
3357   my $count = 0;
3358   for (@$indices) {
3359      $count += $_->{shards};
3360   }
3361
3362   return $count;
3363}
3364
3365sub count_size {
3366   my $self = shift;
3367
3368   my $indices = $self->get_indices or return;
3369
3370   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3371   $fn->kibi_suffix("kb");
3372   $fn->mebi_suffix("mb");
3373   $fn->gibi_suffix("gb");
3374   $fn->kilo_suffix("KB");
3375   $fn->mega_suffix("MB");
3376   $fn->giga_suffix("GB");
3377
3378   my $size = 0;
3379   for (@$indices) {
3380      $size += $fn->to_number($_->{size});
3381   }
3382
3383   return $fn->from_number($size);
3384}
3385
3386sub count_total_size {
3387   my $self = shift;
3388
3389   my $indices = $self->get_indices or return;
3390
3391   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3392   $fn->kibi_suffix("kb");
3393   $fn->mebi_suffix("mb");
3394   $fn->gibi_suffix("gb");
3395   $fn->kilo_suffix("KB");
3396   $fn->mega_suffix("MB");
3397   $fn->giga_suffix("GB");
3398
3399   my $size = 0;
3400   for (@$indices) {
3401      $size += $fn->to_number($_->{total_size});
3402   }
3403
3404   return $fn->from_number($size);
3405}
3406
3407sub count_count {
3408   my $self = shift;
3409
3410   my $indices = $self->get_indices or return;
3411
3412   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3413   $fn->kilo_suffix('k');
3414   $fn->mega_suffix('m');
3415   $fn->giga_suffix('M');
3416
3417   my $count = 0;
3418   for (@$indices) {
3419      $count += $_->{count};
3420   }
3421
3422   return $fn->from_number($count);
3423}
3424
3425sub list_green_indices {
3426   my $self = shift;
3427
3428   my $get = $self->get_indices or return;
3429
3430   my @indices = ();
3431   for (@$get) {
3432      if ($_->{color} eq 'green') {
3433         push @indices, $_->{index};
3434      }
3435   }
3436
3437   return \@indices;
3438}
3439
3440sub list_yellow_indices {
3441   my $self = shift;
3442
3443   my $get = $self->get_indices or return;
3444
3445   my @indices = ();
3446   for (@$get) {
3447      if ($_->{color} eq 'yellow') {
3448         push @indices, $_->{index};
3449      }
3450   }
3451
3452   return \@indices;
3453}
3454
3455sub list_red_indices {
3456   my $self = shift;
3457
3458   my $get = $self->get_indices or return;
3459
3460   my @indices = ();
3461   for (@$get) {
3462      if ($_->{color} eq 'red') {
3463         push @indices, $_->{index};
3464      }
3465   }
3466
3467   return \@indices;
3468}
3469
3470#
3471# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
3472#
3473sub list_datatypes {
3474   my $self = shift;
3475
3476   return {
3477      core => [ qw(string long integer short byte double float data boolean binary) ],
3478   };
3479}
3480
3481#
3482# Return total hits for last www_search
3483#
3484sub get_hits_total {
3485   my $self = shift;
3486   my ($run) = @_;
3487
3488   $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
3489
3490   if (ref($run) eq 'HASH') {
3491      if (exists($run->{hits}) && exists($run->{hits}{total})) {
3492         return $run->{hits}{total};
3493      }
3494   }
3495
3496   return $self->log->error("get_hits_total: last Command not compatible");
3497}
3498
3499sub disable_shard_allocation {
3500   my $self = shift;
3501
3502   my $settings = {
3503      persistent => {
3504         'cluster.routing.allocation.enable' => 'none',
3505      }
3506   };
3507
3508   return $self->put_cluster_settings($settings);
3509}
3510
3511sub enable_shard_allocation {
3512   my $self = shift;
3513
3514   my $settings = {
3515      persistent => { 
3516         'cluster.routing.allocation.enable' => 'all',
3517      }
3518   };
3519
3520   return $self->put_cluster_settings($settings);
3521}
3522
3523sub flush_synced {
3524   my $self = shift;
3525
3526   my $es = $self->_es;
3527   $self->brik_help_run_undef_arg('open', $es) or return;
3528
3529   my $r;
3530   eval {
3531      $r = $es->indices->flush_synced;
3532   };
3533   if ($@) {
3534      chomp($@);
3535      return $self->log->error("flush_synced: failed: [$@]");
3536   }
3537
3538   return $r;
3539}
3540
3541#
3542# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3543#
3544# run client::elasticsearch create_snapshot_repository myrepo
3545#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
3546#
3547# You have to set path.repo in elasticsearch.yml like:
3548# path.repo: ["/home/gomor/es-backups"]
3549#
3550# Search::Elasticsearch::Client::2_0::Direct::Snapshot
3551#
3552sub create_snapshot_repository {
3553   my $self = shift;
3554   my ($body, $repository_name) = @_;
3555
3556   my $es = $self->_es;
3557   $self->brik_help_run_undef_arg('open', $es) or return;
3558   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
3559
3560   $repository_name ||= 'repository';
3561
3562   my %args = (
3563      repository => $repository_name,
3564      body => $body,
3565   );
3566
3567   my $r;
3568   eval {
3569      $r = $es->snapshot->create_repository(%args);
3570   };
3571   if ($@) {
3572      chomp($@);
3573      return $self->log->error("create_snapshot_repository: failed: [$@]");
3574   }
3575
3576   return $r;
3577}
3578
3579sub create_shared_fs_snapshot_repository {
3580   my $self = shift;
3581   my ($location, $repository_name) = @_;
3582
3583   $repository_name ||= 'repository';
3584   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
3585
3586   if ($location !~ m{^/}) {
3587      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
3588         "a full directory path, this one is invalid [$location]");
3589   }
3590
3591   my $body = {
3592      type => 'fs',
3593      settings => {
3594         compress => 'true',
3595         location => $location,
3596      },
3597   };
3598
3599   return $self->create_snapshot_repository($body, $repository_name);
3600}
3601
3602#
3603# Search::Elasticsearch::Client::2_0::Direct::Snapshot
3604#
3605sub get_snapshot_repositories {
3606   my $self = shift;
3607
3608   my $es = $self->_es;
3609   $self->brik_help_run_undef_arg('open', $es) or return;
3610
3611   my $r;
3612   eval {
3613      $r = $es->snapshot->get_repository;
3614   };
3615   if ($@) {
3616      chomp($@);
3617      return $self->log->error("get_snapshot_repositories: failed: [$@]");
3618   }
3619
3620   return $r;
3621}
3622
3623#
3624# Search::Elasticsearch::Client::2_0::Direct::Snapshot
3625#
3626sub get_snapshot_status {
3627   my $self = shift;
3628
3629   my $es = $self->_es;
3630   $self->brik_help_run_undef_arg('open', $es) or return;
3631
3632   my $r;
3633   eval {
3634      $r = $es->snapshot->status;
3635   };
3636   if ($@) {
3637      chomp($@);
3638      return $self->log->error("get_snapshot_status: failed: [$@]");
3639   }
3640
3641   return $r;
3642}
3643
3644#
3645# Search::Elasticsearch::Client::5_0::Direct::Snapshot
3646#
3647sub create_snapshot {
3648   my $self = shift;
3649   my ($snapshot_name, $repository_name, $body) = @_;
3650
3651   my $es = $self->_es;
3652   $self->brik_help_run_undef_arg('open', $es) or return;
3653
3654   $snapshot_name ||= 'snapshot';
3655   $repository_name ||= 'repository';
3656
3657   my %args = (
3658      repository => $repository_name,
3659      snapshot => $snapshot_name,
3660   );
3661   if (defined($body)) {
3662      $args{body} = $body;
3663   }
3664
3665   my $r;
3666   eval {
3667      $r = $es->snapshot->create(%args);
3668   };
3669   if ($@) {
3670      chomp($@);
3671      return $self->log->error("create_snapshot: failed: [$@]");
3672   }
3673
3674   return $r;
3675}
3676
3677sub create_snapshot_for_indices {
3678   my $self = shift;
3679   my ($indices, $snapshot_name, $repository_name) = @_;
3680
3681   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
3682
3683   $snapshot_name ||= 'snapshot';
3684   $repository_name ||= 'repository';
3685
3686   my $body = {
3687      indices => $indices,
3688   };
3689
3690   return $self->create_snapshot($snapshot_name, $repository_name, $body);
3691}
3692
3693sub is_snapshot_finished {
3694   my $self = shift;
3695
3696   my $status = $self->get_snapshot_status or return;
3697
3698   if (@{$status->{snapshots}} == 0) {
3699      return 1;
3700   }
3701
3702   return 0;
3703}
3704
3705sub get_snapshot_state {
3706   my $self = shift;
3707
3708   if ($self->is_snapshot_finished) {
3709      return $self->log->info("get_snapshot_state: is already finished");
3710   }
3711
3712   my $status = $self->get_snapshot_status or return;
3713
3714   my @indices_done = ();
3715   my @indices_not_done = ();
3716
3717   my $list = $status->{snapshots};
3718   for my $snapshot (@$list) {
3719      my $indices = $snapshot->{indices};
3720      for my $index (@$indices) {
3721         my $done = $index->{shards_stats}{done};
3722         if ($done) {
3723            push @indices_done, $index;
3724         }
3725         else {
3726            push @indices_not_done, $index;
3727         }
3728      }
3729   }
3730
3731   return { done => \@indices_done, not_done => \@indices_not_done };
3732}
3733
3734sub verify_snapshot_repository {
3735}
3736
3737sub delete_snapshot_repository {
3738   my $self = shift;
3739   my ($repository_name) = @_;
3740
3741   my $es = $self->_es;
3742   $self->brik_help_run_undef_arg('open', $es) or return;
3743   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
3744
3745   my $r;
3746   eval {
3747      $r = $es->snapshot->delete_repository(
3748         repository => $repository_name,
3749      );
3750   };
3751   if ($@) {
3752      chomp($@);
3753      return $self->log->error("delete_snapshot_repository: failed: [$@]");
3754   }
3755
3756   return $r;
3757}
3758
3759sub get_snapshot {
3760   my $self = shift;
3761   my ($snapshot_name, $repository_name) = @_;
3762
3763   my $es = $self->_es;
3764   $self->brik_help_run_undef_arg('open', $es) or return;
3765
3766   $snapshot_name ||= 'snapshot';
3767   $repository_name ||= 'repository';
3768
3769   my $r;
3770   eval {
3771      $r = $es->snapshot->get(
3772         repository => $repository_name,
3773         snapshot => $snapshot_name,
3774      );
3775   };
3776   if ($@) {
3777      chomp($@);
3778      return $self->log->error("get_snapshot: failed: [$@]");
3779   }
3780
3781   return $r;
3782}
3783
3784#
3785# Search::Elasticsearch::Client::5_0::Direct::Snapshot
3786#
3787sub delete_snapshot {
3788   my $self = shift;
3789   my ($snapshot_name, $repository_name) = @_;
3790
3791   my $es = $self->_es;
3792   $self->brik_help_run_undef_arg('open', $es) or return;
3793   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
3794   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
3795
3796   my $timeout = $self->rtimeout;
3797
3798   my $r;
3799   eval {
3800      $r = $es->snapshot->delete(
3801         repository => $repository_name,
3802         snapshot => $snapshot_name,
3803         master_timeout => "${timeout}s",
3804      );
3805   };
3806   if ($@) {
3807      chomp($@);
3808      return $self->log->error("delete_snapshot: failed: [$@]");
3809   }
3810
3811   return $r;
3812}
3813
3814#
3815# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3816#
3817sub restore_snapshot {
3818   my $self = shift;
3819   my ($snapshot_name, $repository_name, $body) = @_;
3820
3821   my $es = $self->_es;
3822   $snapshot_name ||= 'snapshot';
3823   $repository_name ||= 'repository';
3824   $self->brik_help_run_undef_arg('open', $es) or return;
3825   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
3826   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
3827
3828   my %args = (
3829      repository => $repository_name,
3830      snapshot => $snapshot_name,
3831   );
3832   if (defined($body)) {
3833      $args{body} = $body;
3834   }
3835
3836   my $r;
3837   eval {
3838      $r = $es->snapshot->restore(%args);
3839   };
3840   if ($@) {
3841      chomp($@);
3842      return $self->log->error("restore_snapshot: failed: [$@]");
3843   }
3844
3845   return $r;
3846}
3847
3848sub restore_snapshot_for_indices {
3849   my $self = shift;
3850   my ($indices, $snapshot_name, $repository_name) = @_;
3851
3852   $snapshot_name ||= 'snapshot';
3853   $repository_name ||= 'repository';
3854   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
3855   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
3856   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
3857
3858   my $body = {
3859      indices => $indices,
3860   };
3861
3862   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
3863}
3864
3865# shard occupation
3866#
3867# curl -XGET "http://127.0.0.1:9200/_cat/shards?v
3868# Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
3869#
3870# disk occuption:
3871# curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
3872#
3873#
3874# Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
3875#
3876
3877# Check memory lock
3878
3879# curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
3880# {
3881#  "nodes" : {
3882#    "3XXX" : {
3883#      "process" : {
3884#        "mlockall" : true
3885#      }
3886#    }
3887#  }
3888# }
3889
38901;
3891
3892__END__
3893
3894=head1 NAME
3895
3896Metabrik::Client::Elasticsearch - client::elasticsearch Brik
3897
3898=head1 SYNOPSIS
3899
3900   host:~> my $q = { term => { ip => "192.168.57.19" } }
3901   host:~> run client::elasticsearch open
3902   host:~> run client::elasticsearch query $q data-*
3903
3904=head1 DESCRIPTION
3905
3906Template to write a new Metabrik Brik.
3907
3908=head1 COPYRIGHT AND LICENSE
3909
3910Copyright (c) 2014-2018, Patrice E<lt>GomoRE<gt> Auffret
3911
3912You may distribute this module under the terms of The BSD 3-Clause License.
3913See LICENSE file in the source distribution archive.
3914
3915=head1 AUTHOR
3916
3917Patrice E<lt>GomoRE<gt> Auffret
3918
3919=cut
Note: See TracBrowser for help on using the repository browser.