source: repository/lib/Metabrik/Client/Elasticsearch/Query.pm

Last change on this file was 945:2c017f5e42f4, checked in by GomoR <gomor@…>, 5 weeks ago
  • UPDATE: client::elasticsearch::*: do not use $RUN from Context anymore
File size: 15.9 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch::query Brik
5#
6package Metabrik::Client::Elasticsearch::Query;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Elasticsearch);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         nodes => [ qw(node_list) ], # Inherited
20         index => [ qw(index) ],     # Inherited
21         type => [ qw(type) ],       # Inherited
22         from => [ qw(number) ],     # Inherited
23         size => [ qw(count) ],      # Inherited
24         client => [ qw(INTERNAL) ],
25      },
26      attributes_default => {
27         index => '*',
28         type => '*',
29      },
30      commands => {
31         create_client => [ ],
32         reset_client => [ ],
33         query => [ qw(query index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
34         get_query_result_total => [ qw($query_result|OPTIONAL) ],
35         get_query_result_hits => [ qw($query_result|OPTIONAL) ],
36         get_query_result_aggregations => [ qw($query_result|OPTIONAL) ],
37         get_query_result_timed_out => [ qw($query_result|OPTIONAL) ],
38         get_query_result_took => [ qw($query_result|OPTIONAL) ],
39         term => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
40         unique_term => [ qw(unique kv index|OPTIONAL type|OPTIONAL) ],
41         unique_values => [ qw(field index|OPTIONAL type|OPTIONAL) ],
42         wildcard => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
43         range => [ qw(kv_from kv_to index|OPTIONAL type|OPTIONAL) ],
44         top => [ qw(kv_count index|OPTIONAL type|OPTIONAL) ],
45         top_match => [ qw(kv_count kv_match index|OPTIONAL type|OPTIONAL) ],
46         match => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
47         match_phrase => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
48         from_json_file => [ qw(json_file index|OPTIONAL type|OPTIONAL) ],
49         from_dump_file => [ qw(dump_file index|OPTIONAL type|OPTIONAL) ],
50      },
51      require_modules => {
52         'Metabrik::File::Json' => [ ],
53         'Metabrik::File::Dump' => [ ],
54      },
55   };
56}
57
58sub create_client {
59   my $self = shift;
60
61   my $ce = $self->client;
62   if (! defined($ce)) {
63      $ce = $self->open or return;
64      $self->client($ce);
65   }
66
67   return $ce;
68}
69
70sub reset_client {
71   my $self = shift;
72
73   my $ce = $self->client;
74   if (defined($ce)) {
75      $self->client(undef);
76   }
77
78   return 1;
79}
80
81sub get_query_result_total {
82   my $self = shift;
83   my ($run) = @_;
84
85   $self->brik_help_run_undef_arg('get_query_result_total', $run) or return;
86   $self->brik_help_run_invalid_arg('get_query_result_total', $run, 'HASH') or return;
87
88   if (! exists($run->{hits})) {
89      return $self->log->error("get_query_result_total: invalid query result, no hits found");
90   }
91   if (! exists($run->{hits}{total})) {
92      return $self->log->error("get_query_result_total: invalid query result, no total found");
93   }
94
95   return $run->{hits}{total};
96}
97
98sub get_query_result_hits {
99   my $self = shift;
100   my ($run) = @_;
101
102   $self->brik_help_run_undef_arg('get_query_result_hits', $run) or return;
103   $self->brik_help_run_invalid_arg('get_query_result_hits', $run, 'HASH') or return;
104
105   if (! exists($run->{hits})) {
106      return $self->log->error("get_query_result_hits: invalid query result, no hits found");
107   }
108   if (! exists($run->{hits}{hits})) {
109      return $self->log->error("get_query_result_hits: invalid query result, no hits in hits found");
110   }
111
112   return $run->{hits}{hits};
113}
114
115sub get_query_result_aggregations {
116   my $self = shift;
117   my ($run) = @_;
118
119   $self->brik_help_run_undef_arg('get_query_result_aggregations', $run) or return;
120   $self->brik_help_run_invalid_arg('get_query_result_aggregations', $run, 'HASH')
121      or return;
122
123   if (! exists($run->{aggregations})) {
124      return $self->log->error("get_query_result_aggregations: invalid query result, ".
125         "no aggregations found");
126   }
127
128   return $run->{aggregations};
129}
130
131sub get_query_result_timed_out {
132   my $self = shift;
133   my ($run) = @_;
134
135   $self->brik_help_run_undef_arg('get_query_result_timed_out', $run) or return;
136   $self->brik_help_run_invalid_arg('get_query_result_timed_out', $run, 'HASH')
137      or return;
138
139   if (! exists($run->{timed_out})) {
140      return $self->log->error("get_query_result_timed_out: invalid query result, ".
141         "no timed_out found");
142   }
143
144   return $run->{timed_out} ? 1 : 0;
145}
146
147sub get_query_result_took {
148   my $self = shift;
149   my ($run) = @_;
150
151   $self->brik_help_run_undef_arg('get_query_result_took', $run) or return;
152   $self->brik_help_run_invalid_arg('get_query_result_took', $run, 'HASH')
153      or return;
154
155   if (! exists($run->{took})) {
156      return $self->log->error("get_query_result_took: invalid query result, no took found");
157   }
158
159   return $run->{took};
160}
161
162sub query {
163   my $self = shift;
164   my ($q, $index, $type, $hash) = @_;
165
166   $index ||= '*';
167   $type ||= '*';
168   $self->brik_help_run_undef_arg('query', $q) or return;
169
170   my $ce = $self->create_client or return;
171
172   my $r = $self->SUPER::query($q, $index, $type, $hash) or return;
173   if (defined($r)) {
174      if (exists($r->{hits}{total})) {
175         return $r;
176      }
177      else {
178         return $self->log->error("query: failed with [$r]");
179      }
180   }
181
182   return $self->log->error("query: failed");
183}
184
185#
186# run client::elasticsearch::query term domain=example.com index1-*,index2-*
187#
188sub term {
189   my $self = shift;
190   my ($kv, $index, $type) = @_;
191
192   $index ||= $self->index;
193   $type ||= $self->type;
194   $self->brik_help_run_undef_arg('term', $kv) or return;
195   $self->brik_help_set_undef_arg('term', $index) or return;
196   $self->brik_help_set_undef_arg('term', $type) or return;
197
198   if ($kv !~ /^\S+?=.+$/) {
199      return $self->log->error("term: kv must be in the form 'key=value'");
200   }
201   my ($key, $value) = split('=', $kv);
202
203   $self->log->debug("term: key[$key] value[$value]");
204
205   # Optimized version on ES 5.0.0
206   my $q = {
207      size => $self->size,
208      query => {
209         bool => {
210            must => { term => { $key => $value } },
211         },
212      },
213   };
214
215   $self->log->verbose("term: keys [$key] value [$value] index [$index] type [$type]");
216
217   return $self->query($q, $index, $type);
218}
219
220#
221# run client::elasticsearch::query unique_term ip domain=example.com index1-*,index2-*
222#
223sub unique_term {
224   my $self = shift;
225   my ($unique, $kv, $index, $type) = @_;
226
227   $index ||= $self->index;
228   $type ||= $self->type;
229   $self->brik_help_run_undef_arg('unique_term', $unique) or return;
230   $self->brik_help_run_undef_arg('unique_term', $kv) or return;
231   $self->brik_help_set_undef_arg('unique_term', $index) or return;
232   $self->brik_help_set_undef_arg('unique_term', $type) or return;
233
234   if ($kv !~ m{^.+?=.+$}) {
235      return $self->log->error("unique_term: kv [$kv] must be in the form ".
236         "'key=value'");
237   }
238   my ($key, $value) = split('=', $kv);
239
240   $self->log->debug("unique_term: key[$key] value[$value]");
241
242   # Optimized version on ES 5.0.0
243   my $q = {
244      size => 0,
245      query => {
246         bool => {
247            must => { term => { $key => $value } },
248         },
249      },
250      aggs => {
251         1 => {
252            cardinality => {
253               field => $unique,
254               precision_threshold => 40000,
255            },
256         },
257      },
258   };
259
260   $self->log->verbose("unique_term: unique [$unique] keys [$key] value [$value] ".
261      "index [$index] type [$type]");
262
263   return $self->query($q, $index, $type);
264}
265
266#
267#
268#
269sub unique_values {
270   my $self = shift;
271   my ($field, $index, $type) = @_;
272
273   $index ||= $self->index;
274   $type ||= $self->type;
275   $self->brik_help_run_undef_arg('unique_values', $field) or return;
276   $self->brik_help_set_undef_arg('unique_values', $index) or return;
277   $self->brik_help_set_undef_arg('unique_values', $type) or return;
278
279   my $size = $self->size * 10;
280
281   # Will return 10*100000=1_000_000 unique values.
282   my $q = {
283      aggs => {
284         1 => {
285            terms => {
286               field => $field,
287               include => { num_partitions => 10, partition => 0 },
288               size => $size,
289            },
290         },
291      },
292      size => 0,
293   };
294
295   $self->log->verbose("unique_values: unique [$field] index [$index] type [$type]");
296
297   return $self->query($q, $index, $type);
298}
299
300sub wildcard {
301   my $self = shift;
302   my ($kv, $index, $type) = @_;
303
304   $index ||= $self->index;
305   $type ||= $self->type;
306   $self->brik_help_run_undef_arg('wildcard', $kv) or return;
307   $self->brik_help_set_undef_arg('wildcard', $index) or return;
308   $self->brik_help_set_undef_arg('wildcard', $type) or return;
309
310   if ($kv !~ /^\S+?=.+$/) {
311      return $self->log->error("wildcard: kv must be in the form 'key=value'");
312   }
313   my ($key, $value) = split('=', $kv);
314
315   my $q = {
316      size => $self->size,
317      query => {
318         #constant_score => {  # Does not like constant_score
319            #filter => {
320               wildcard => {
321                  $key => $value,
322               },
323            #},
324         #},
325      },
326   };
327
328   $self->log->verbose("wildcard: keys [$key] value [$value] index [$index] type [$type]");
329
330   return $self->query($q, $index, $type);
331}
332
333#
334# run client::elasticsearch::query range ip_range.from=192.168.255.36 ip_range.to=192.168.255.36
335#
336sub range {
337   my $self = shift;
338   my ($kv_from, $kv_to, $index, $type) = @_;
339
340   $index ||= $self->index;
341   $type ||= $self->type;
342   $self->brik_help_run_undef_arg('range', $kv_from) or return;
343   $self->brik_help_run_undef_arg('range', $kv_to) or return;
344   $self->brik_help_set_undef_arg('range', $index) or return;
345   $self->brik_help_set_undef_arg('range', $type) or return;
346
347   if ($kv_from !~ /^\S+?=.+$/) {
348      return $self->log->error("range: kv_from [$kv_from] must be in the form 'key=value'");
349   }
350   if ($kv_to !~ /^\S+?=.+$/) {
351      return $self->log->error("range: kv_to [$kv_to] must be in the form 'key=value'");
352   }
353   my ($key_from, $value_from) = split('=', $kv_from);
354   my ($key_to, $value_to) = split('=', $kv_to);
355
356   #
357   # http://stackoverflow.com/questions/40519806/no-query-registered-for-filtered
358   # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-filtered-query.html
359   # Compatible with ES 5.0
360   #
361   my $q = {
362      size => $self->size,
363      query => {
364         bool => {
365            must => [
366               { range => { $key_to => { gte => $value_to } } },
367               { range => { $key_from => { lte => $value_from } } },
368            ],
369         },
370      },
371   };
372
373   return $self->query($q, $index, $type);
374}
375
376#
377# run client::elasticsearch::query top name=10 users-*
378#
379sub top {
380   my $self = shift;
381   my ($kv_count, $index, $type) = @_;
382
383   $index ||= $self->index;
384   $type ||= $self->type;
385   $self->brik_help_run_undef_arg('top', $kv_count) or return;
386   $self->brik_help_set_undef_arg('top', $index) or return;
387   $self->brik_help_set_undef_arg('top', $type) or return;
388
389   if ($kv_count !~ /^\S+=\d+$/) {
390      return $self->log->error("top: kv_count [$kv_count] must be in the form 'key=value'");
391   }
392   my ($key_count, $value_count) = split('=', $kv_count);
393
394   my $q = {
395      aggs => {
396         top_values => {
397            terms => {
398               field => $key_count,
399               size => int($value_count),
400               order => { _count => 'desc' },
401            },
402         },
403      },
404   };
405
406   $self->log->verbose("top: key [$key_count] value [$value_count] index [$index] type [$type]");
407
408   return $self->query($q, $index, $type);
409}
410
411sub match_phrase {
412   my $self = shift;
413   my ($kv, $index, $type) = @_;
414
415   $index ||= $self->index;
416   $type ||= $self->type;
417   $self->brik_help_run_undef_arg('match_phrase', $kv) or return;
418   $self->brik_help_set_undef_arg('match_phrase', $index) or return;
419   $self->brik_help_set_undef_arg('match_phrase', $type) or return;
420
421   if ($kv !~ /^\S+?=.+$/) {
422      return $self->log->error("match_phrase: kv must be in the form 'key=value'");
423   }
424   my ($key, $value) = split('=', $kv);
425
426   $self->log->debug("match_phrase: key[$key] value[$value]");
427
428   my $q = {
429      size => $self->size,
430      query => {
431         match_phrase => {
432            $key => $value,
433         },
434      },
435   };
436
437   return $self->query($q, $index, $type);
438}
439
440sub match {
441   my $self = shift;
442   my ($kv, $index, $type) = @_;
443
444   $index ||= $self->index;
445   $type ||= $self->type;
446   $self->brik_help_run_undef_arg('match', $kv) or return;
447   $self->brik_help_set_undef_arg('match', $index) or return;
448   $self->brik_help_set_undef_arg('match', $type) or return;
449
450   if ($kv !~ /^\S+?=.+$/) {
451      return $self->log->error("match: kv must be in the form 'key=value'");
452   }
453   my ($key, $value) = split('=', $kv);
454
455   $self->log->debug("match: key[$key] value[$value]");
456
457   my $q = {
458      size => $self->size,
459      query => {
460         match => {
461            $key => $value,
462         },
463      },
464   };
465
466   return $self->query($q, $index, $type);
467}
468
469#
470# run client::elasticsearch::query top_match domain=10 host=*www*
471#
472sub top_match {
473   my $self = shift;
474   my ($kv_count, $kv_match, $index, $type) = @_;
475
476   $index ||= $self->index;
477   $type ||= $self->type;
478   $self->brik_help_run_undef_arg('top_match', $kv_count) or return;
479   $self->brik_help_run_undef_arg('top_match', $kv_match) or return;
480   $self->brik_help_set_undef_arg('top_match', $index) or return;
481   $self->brik_help_set_undef_arg('top_match', $type) or return;
482
483   if ($kv_count !~ /^\S+?=.+$/) {
484      return $self->log->error("top_match: kv_count [$kv_count] must be in the form 'key=value'");
485   }
486   if ($kv_match !~ /^\S+?=.+$/) {
487      return $self->log->error("top_match: kv_match [$kv_match] must be in the form 'key=value'");
488   }
489   my ($key_count, $value_count) = split('=', $kv_count);
490   my ($key_match, $value_match) = split('=', $kv_match);
491
492   my $q = {
493      size => $self->size,
494      query => {
495         #constant_score => {   # Does not like constant_score
496            #filter => {
497               match => {
498                  $key_match => $value_match,
499               },
500            #},
501         #},
502      },
503      aggs => {
504         top_values => {
505            terms => {
506               field => $key_count,
507               size => int($value_count),
508            },
509         },
510      },
511   };
512
513   return $self->query($q, $index, $type);
514}
515
516sub from_json_file {
517   my $self = shift;
518   my ($file, $index, $type) = @_;
519
520   $index ||= $self->index;
521   $type ||= $self->type;
522   $self->brik_help_run_undef_arg('from_json_file', $file) or return;
523   $self->brik_help_run_file_not_found('from_json_file', $file) or return;
524   $self->brik_help_set_undef_arg('from_json_file', $index) or return;
525   $self->brik_help_set_undef_arg('from_json_file', $type) or return;
526
527   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
528   my $q = $fj->read($file) or return;
529
530   if (defined($q) && length($q)) {
531      return $self->query($q, $index, $type);
532   }
533
534   return $self->log->error("from_json_file: nothing to read from this file [$file]");
535}
536
537sub from_dump_file {
538   my $self = shift;
539   my ($file, $index, $type) = @_;
540
541   $index ||= $self->index;
542   $type ||= $self->type;
543   $self->brik_help_run_undef_arg('from_dump_file', $file) or return;
544   $self->brik_help_run_file_not_found('from_dump_file', $file) or return;
545   $self->brik_help_set_undef_arg('from_dump_file', $index) or return;
546   $self->brik_help_set_undef_arg('from_dump_file', $type) or return;
547
548   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
549   my $q = $fd->read($file) or return;
550
551   my $first = $q->[0];
552   if (defined($first)) {
553      return $self->query($first, $index, $type);
554   }
555
556   return $self->log->error("from_dump_file: nothing to read from this file [$file]");
557}
558
5591;
560
561__END__
562
563=head1 NAME
564
565Metabrik::Client::Elasticsearch::Query - client::elasticsearch::query Brik
566
567=head1 COPYRIGHT AND LICENSE
568
569Copyright (c) 2014-2017, Patrice E<lt>GomoRE<gt> Auffret
570
571You may distribute this module under the terms of The BSD 3-Clause License.
572See LICENSE file in the source distribution archive.
573
574=head1 AUTHOR
575
576Patrice E<lt>GomoRE<gt> Auffret
577
578=cut
Note: See TracBrowser for help on using the repository browser.