1219 lines
31 KiB
Erlang
1219 lines
31 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_funcs).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-elvis([{elvis_style, god_modules, disable}]).
|
|
|
|
%% IoT Funcs
|
|
-export([
|
|
msgid/0,
|
|
qos/0,
|
|
flags/0,
|
|
flag/1,
|
|
topic/0,
|
|
topic/1,
|
|
clientid/0,
|
|
clientip/0,
|
|
peerhost/0,
|
|
username/0,
|
|
payload/0,
|
|
payload/1,
|
|
contains_topic/2,
|
|
contains_topic/3,
|
|
contains_topic_match/2,
|
|
contains_topic_match/3,
|
|
null/0
|
|
]).
|
|
|
|
%% Arithmetic Funcs
|
|
-export([
|
|
'+'/2,
|
|
'-'/2,
|
|
'*'/2,
|
|
'/'/2,
|
|
'div'/2,
|
|
mod/2,
|
|
eq/2
|
|
]).
|
|
|
|
%% Math Funcs
|
|
-export([
|
|
abs/1,
|
|
acos/1,
|
|
acosh/1,
|
|
asin/1,
|
|
asinh/1,
|
|
atan/1,
|
|
atanh/1,
|
|
ceil/1,
|
|
cos/1,
|
|
cosh/1,
|
|
exp/1,
|
|
floor/1,
|
|
fmod/2,
|
|
log/1,
|
|
log10/1,
|
|
log2/1,
|
|
power/2,
|
|
round/1,
|
|
sin/1,
|
|
sinh/1,
|
|
sqrt/1,
|
|
tan/1,
|
|
tanh/1
|
|
]).
|
|
|
|
%% Bitwise operations
|
|
-export([
|
|
bitnot/1,
|
|
bitand/2,
|
|
bitor/2,
|
|
bitxor/2,
|
|
bitsl/2,
|
|
bitsr/2
|
|
]).
|
|
|
|
%% binary and bitstring Funcs
|
|
-export([
|
|
bitsize/1,
|
|
bytesize/1,
|
|
subbits/2,
|
|
subbits/3,
|
|
subbits/6
|
|
]).
|
|
|
|
%% Data Type Conversion
|
|
-export([
|
|
str/1,
|
|
str_utf8/1,
|
|
bool/1,
|
|
int/1,
|
|
float/1,
|
|
float/2,
|
|
float2str/2,
|
|
map/1,
|
|
bin2hexstr/1,
|
|
hexstr2bin/1
|
|
]).
|
|
|
|
%% Data Type Validation Funcs
|
|
-export([
|
|
is_null/1,
|
|
is_not_null/1,
|
|
is_str/1,
|
|
is_bool/1,
|
|
is_int/1,
|
|
is_float/1,
|
|
is_num/1,
|
|
is_map/1,
|
|
is_array/1
|
|
]).
|
|
|
|
%% String Funcs
|
|
-export([
|
|
lower/1,
|
|
ltrim/1,
|
|
reverse/1,
|
|
rtrim/1,
|
|
strlen/1,
|
|
substr/2,
|
|
substr/3,
|
|
trim/1,
|
|
upper/1,
|
|
split/2,
|
|
split/3,
|
|
concat/2,
|
|
tokens/2,
|
|
tokens/3,
|
|
sprintf_s/2,
|
|
pad/2,
|
|
pad/3,
|
|
pad/4,
|
|
replace/3,
|
|
replace/4,
|
|
regex_match/2,
|
|
regex_replace/3,
|
|
ascii/1,
|
|
find/2,
|
|
find/3,
|
|
jq/2,
|
|
jq/3
|
|
]).
|
|
|
|
%% Map Funcs
|
|
-export([map_new/0]).
|
|
|
|
-export([
|
|
map_get/2,
|
|
map_get/3,
|
|
map_put/3
|
|
]).
|
|
|
|
%% For backward compatibility
|
|
-export([
|
|
mget/2,
|
|
mget/3,
|
|
mput/3
|
|
]).
|
|
|
|
%% Array Funcs
|
|
-export([
|
|
nth/2,
|
|
length/1,
|
|
sublist/2,
|
|
sublist/3,
|
|
first/1,
|
|
last/1,
|
|
contains/2
|
|
]).
|
|
|
|
%% Hash Funcs
|
|
-export([
|
|
md5/1,
|
|
sha/1,
|
|
sha256/1
|
|
]).
|
|
|
|
%% zip Funcs
|
|
-export([
|
|
zip/1,
|
|
unzip/1
|
|
]).
|
|
|
|
%% gzip Funcs
|
|
-export([
|
|
gzip/1,
|
|
gunzip/1
|
|
]).
|
|
|
|
%% compressed Funcs
|
|
-export([
|
|
zip_compress/1,
|
|
zip_uncompress/1
|
|
]).
|
|
|
|
%% Data encode and decode
|
|
-export([
|
|
base64_encode/1,
|
|
base64_decode/1,
|
|
json_decode/1,
|
|
json_encode/1,
|
|
term_decode/1,
|
|
term_encode/1
|
|
]).
|
|
|
|
%% Date functions
|
|
-export([
|
|
now_rfc3339/0,
|
|
now_rfc3339/1,
|
|
unix_ts_to_rfc3339/1,
|
|
unix_ts_to_rfc3339/2,
|
|
rfc3339_to_unix_ts/1,
|
|
rfc3339_to_unix_ts/2,
|
|
now_timestamp/0,
|
|
now_timestamp/1,
|
|
format_date/3,
|
|
format_date/4,
|
|
date_to_unix_ts/3,
|
|
date_to_unix_ts/4,
|
|
timezone_to_second/1,
|
|
timezone_to_offset_seconds/1
|
|
]).
|
|
|
|
%% See extra_functions_module/0 and set_extra_functions_module/1 in the
|
|
%% emqx_rule_engine module
|
|
-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
|
|
|
|
%% MongoDB specific date functions. These functions return a date tuple. The
|
|
%% MongoDB bridge converts such date tuples to a MongoDB date type. The
|
|
%% following functions are therefore only useful for rules with at least one
|
|
%% MongoDB action.
|
|
-export([
|
|
mongo_date/0,
|
|
mongo_date/1,
|
|
mongo_date/2
|
|
]).
|
|
|
|
%% Random Funcs
|
|
-export([
|
|
random/0,
|
|
uuid_v4/0,
|
|
uuid_v4_no_hyphen/0
|
|
]).
|
|
|
|
%% Proc Dict Func
|
|
-export([
|
|
proc_dict_get/1,
|
|
proc_dict_put/2,
|
|
proc_dict_del/1,
|
|
kv_store_get/1,
|
|
kv_store_get/2,
|
|
kv_store_put/2,
|
|
kv_store_del/1
|
|
]).
|
|
|
|
-export(['$handle_undefined_function'/2]).
|
|
|
|
-compile(
|
|
{no_auto_import, [
|
|
abs/1,
|
|
ceil/1,
|
|
floor/1,
|
|
round/1,
|
|
map_get/2
|
|
]}
|
|
).
|
|
|
|
-import(emqx_utils_calendar, [time_unit/1, now_to_rfc3339/0, now_to_rfc3339/1, epoch_to_rfc3339/2]).
|
|
|
|
%% @doc "msgid()" Func
|
|
msgid() ->
|
|
fun
|
|
(#{id := MsgId}) -> MsgId;
|
|
(_) -> undefined
|
|
end.
|
|
|
|
%% @doc "qos()" Func
|
|
qos() ->
|
|
fun
|
|
(#{qos := QoS}) -> QoS;
|
|
(_) -> undefined
|
|
end.
|
|
|
|
%% @doc "topic()" Func
|
|
topic() ->
|
|
fun
|
|
(#{topic := Topic}) -> Topic;
|
|
(_) -> undefined
|
|
end.
|
|
|
|
%% @doc "topic(N)" Func
|
|
topic(I) when is_integer(I) ->
|
|
fun
|
|
(#{topic := Topic}) ->
|
|
lists:nth(I, emqx_topic:tokens(Topic));
|
|
(_) ->
|
|
undefined
|
|
end.
|
|
|
|
%% @doc "flags()" Func
|
|
flags() ->
|
|
fun
|
|
(#{flags := Flags}) -> Flags;
|
|
(_) -> #{}
|
|
end.
|
|
|
|
%% @doc "flags(Name)" Func
|
|
flag(Name) ->
|
|
fun
|
|
(#{flags := Flags}) -> emqx_rule_maps:nested_get({var, Name}, Flags);
|
|
(_) -> undefined
|
|
end.
|
|
|
|
%% @doc "clientid()" Func
|
|
clientid() ->
|
|
fun
|
|
(#{from := ClientId}) -> ClientId;
|
|
(_) -> undefined
|
|
end.
|
|
|
|
%% @doc "username()" Func
|
|
username() ->
|
|
fun
|
|
(#{username := Username}) -> Username;
|
|
(_) -> undefined
|
|
end.
|
|
|
|
%% @doc "clientip()" Func
|
|
clientip() ->
|
|
peerhost().
|
|
|
|
peerhost() ->
|
|
fun
|
|
(#{peerhost := Addr}) -> Addr;
|
|
(_) -> undefined
|
|
end.
|
|
|
|
payload() ->
|
|
fun
|
|
(#{payload := Payload}) -> Payload;
|
|
(_) -> undefined
|
|
end.
|
|
|
|
payload(Path) ->
|
|
fun
|
|
(#{payload := Payload}) when erlang:is_map(Payload) ->
|
|
emqx_rule_maps:nested_get(map_path(Path), Payload);
|
|
(_) ->
|
|
undefined
|
|
end.
|
|
|
|
%% @doc Check if a topic_filter contains a specific topic
|
|
%% TopicFilters = [{<<"t/a">>, #{qos => 0}].
|
|
-spec contains_topic(emqx_types:topic_filters(), emqx_types:topic()) ->
|
|
true | false.
|
|
contains_topic(TopicFilters, Topic) ->
|
|
case find_topic_filter(Topic, TopicFilters, fun eq/2) of
|
|
not_found -> false;
|
|
_ -> true
|
|
end.
|
|
contains_topic(TopicFilters, Topic, QoS) ->
|
|
case find_topic_filter(Topic, TopicFilters, fun eq/2) of
|
|
{_, #{qos := QoS}} -> true;
|
|
_ -> false
|
|
end.
|
|
|
|
-spec contains_topic_match(emqx_types:topic_filters(), emqx_types:topic()) ->
|
|
true | false.
|
|
contains_topic_match(TopicFilters, Topic) ->
|
|
case find_topic_filter(Topic, TopicFilters, fun emqx_topic:match/2) of
|
|
not_found -> false;
|
|
_ -> true
|
|
end.
|
|
contains_topic_match(TopicFilters, Topic, QoS) ->
|
|
case find_topic_filter(Topic, TopicFilters, fun emqx_topic:match/2) of
|
|
{_, #{qos := QoS}} -> true;
|
|
_ -> false
|
|
end.
|
|
|
|
find_topic_filter(Filter, TopicFilters, Func) ->
|
|
try
|
|
[
|
|
case Func(Topic, Filter) of
|
|
true -> throw(Result);
|
|
false -> ok
|
|
end
|
|
|| Result = #{topic := Topic} <- TopicFilters
|
|
],
|
|
not_found
|
|
catch
|
|
throw:Result -> Result
|
|
end.
|
|
|
|
null() ->
|
|
undefined.
|
|
|
|
bytesize(IoList) ->
|
|
erlang:iolist_size(IoList).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Arithmetic Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
%% plus 2 numbers
|
|
'+'(X, Y) when is_number(X), is_number(Y) ->
|
|
X + Y;
|
|
%% string concatenation
|
|
%% this requires one of the arguments is string, the other argument will be converted
|
|
%% to string automatically (implicit conversion)
|
|
'+'(X, Y) when is_binary(X); is_binary(Y) ->
|
|
concat(X, Y).
|
|
|
|
'-'(X, Y) when is_number(X), is_number(Y) ->
|
|
X - Y.
|
|
|
|
'*'(X, Y) when is_number(X), is_number(Y) ->
|
|
X * Y.
|
|
|
|
'/'(X, Y) when is_number(X), is_number(Y) ->
|
|
X / Y.
|
|
|
|
'div'(X, Y) when is_integer(X), is_integer(Y) ->
|
|
X div Y.
|
|
|
|
mod(X, Y) when is_integer(X), is_integer(Y) ->
|
|
X rem Y.
|
|
|
|
eq(X, Y) ->
|
|
X == Y.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Math Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
abs(N) when is_integer(N) ->
|
|
erlang:abs(N).
|
|
|
|
acos(N) when is_number(N) ->
|
|
math:acos(N).
|
|
|
|
acosh(N) when is_number(N) ->
|
|
math:acosh(N).
|
|
|
|
asin(N) when is_number(N) ->
|
|
math:asin(N).
|
|
|
|
asinh(N) when is_number(N) ->
|
|
math:asinh(N).
|
|
|
|
atan(N) when is_number(N) ->
|
|
math:atan(N).
|
|
|
|
atanh(N) when is_number(N) ->
|
|
math:atanh(N).
|
|
|
|
ceil(N) when is_number(N) ->
|
|
erlang:ceil(N).
|
|
|
|
cos(N) when is_number(N) ->
|
|
math:cos(N).
|
|
|
|
cosh(N) when is_number(N) ->
|
|
math:cosh(N).
|
|
|
|
exp(N) when is_number(N) ->
|
|
math:exp(N).
|
|
|
|
floor(N) when is_number(N) ->
|
|
erlang:floor(N).
|
|
|
|
fmod(X, Y) when is_number(X), is_number(Y) ->
|
|
math:fmod(X, Y).
|
|
|
|
log(N) when is_number(N) ->
|
|
math:log(N).
|
|
|
|
log10(N) when is_number(N) ->
|
|
math:log10(N).
|
|
|
|
log2(N) when is_number(N) ->
|
|
math:log2(N).
|
|
|
|
power(X, Y) when is_number(X), is_number(Y) ->
|
|
math:pow(X, Y).
|
|
|
|
round(N) when is_number(N) ->
|
|
erlang:round(N).
|
|
|
|
sin(N) when is_number(N) ->
|
|
math:sin(N).
|
|
|
|
sinh(N) when is_number(N) ->
|
|
math:sinh(N).
|
|
|
|
sqrt(N) when is_number(N) ->
|
|
math:sqrt(N).
|
|
|
|
tan(N) when is_number(N) ->
|
|
math:tan(N).
|
|
|
|
tanh(N) when is_number(N) ->
|
|
math:tanh(N).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Bits Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
bitnot(I) when is_integer(I) ->
|
|
bnot I.
|
|
|
|
bitand(X, Y) when is_integer(X), is_integer(Y) ->
|
|
X band Y.
|
|
|
|
bitor(X, Y) when is_integer(X), is_integer(Y) ->
|
|
X bor Y.
|
|
|
|
bitxor(X, Y) when is_integer(X), is_integer(Y) ->
|
|
X bxor Y.
|
|
|
|
bitsl(X, I) when is_integer(X), is_integer(I) ->
|
|
X bsl I.
|
|
|
|
bitsr(X, I) when is_integer(X), is_integer(I) ->
|
|
X bsr I.
|
|
|
|
bitsize(Bits) when is_bitstring(Bits) ->
|
|
bit_size(Bits).
|
|
|
|
subbits(Bits, Len) when is_integer(Len), is_bitstring(Bits) ->
|
|
subbits(Bits, 1, Len).
|
|
|
|
subbits(Bits, Start, Len) when is_integer(Start), is_integer(Len), is_bitstring(Bits) ->
|
|
get_subbits(Bits, Start, Len, <<"integer">>, <<"unsigned">>, <<"big">>).
|
|
|
|
subbits(Bits, Start, Len, Type, Signedness, Endianness) when
|
|
is_integer(Start), is_integer(Len), is_bitstring(Bits)
|
|
->
|
|
get_subbits(Bits, Start, Len, Type, Signedness, Endianness).
|
|
|
|
get_subbits(Bits, Start, Len, Type, Signedness, Endianness) ->
|
|
Begin = Start - 1,
|
|
case Bits of
|
|
<<_:Begin, Rem/bits>> when Rem =/= <<>> ->
|
|
Sz = bit_size(Rem),
|
|
do_get_subbits(Rem, Sz, Len, Type, Signedness, Endianness);
|
|
_ ->
|
|
undefined
|
|
end.
|
|
|
|
-define(match_bits(Bits0, Pattern, ElesePattern),
|
|
case Bits0 of
|
|
Pattern ->
|
|
SubBits;
|
|
ElesePattern ->
|
|
SubBits
|
|
end
|
|
).
|
|
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"big">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/integer-unsigned-big-unit:1, _/bits>>,
|
|
<<SubBits:Sz/integer-unsigned-big-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"big">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/float-unsigned-big-unit:1, _/bits>>,
|
|
<<SubBits:Sz/float-unsigned-big-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"big">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/bits-unsigned-big-unit:1, _/bits>>,
|
|
<<SubBits:Sz/bits-unsigned-big-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"big">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/integer-signed-big-unit:1, _/bits>>,
|
|
<<SubBits:Sz/integer-signed-big-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"big">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/float-signed-big-unit:1, _/bits>>,
|
|
<<SubBits:Sz/float-signed-big-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"big">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/bits-signed-big-unit:1, _/bits>>,
|
|
<<SubBits:Sz/bits-signed-big-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"little">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/integer-unsigned-little-unit:1, _/bits>>,
|
|
<<SubBits:Sz/integer-unsigned-little-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"little">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/float-unsigned-little-unit:1, _/bits>>,
|
|
<<SubBits:Sz/float-unsigned-little-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"little">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/bits-unsigned-little-unit:1, _/bits>>,
|
|
<<SubBits:Sz/bits-unsigned-little-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"little">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/integer-signed-little-unit:1, _/bits>>,
|
|
<<SubBits:Sz/integer-signed-little-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"little">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/float-signed-little-unit:1, _/bits>>,
|
|
<<SubBits:Sz/float-signed-little-unit:1>>
|
|
);
|
|
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) ->
|
|
?match_bits(
|
|
Bits,
|
|
<<SubBits:Len/bits-signed-little-unit:1, _/bits>>,
|
|
<<SubBits:Sz/bits-signed-little-unit:1>>
|
|
).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Data Type Conversion Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
str(Data) ->
|
|
emqx_utils_conv:bin(Data).
|
|
|
|
str_utf8(Data) when is_binary(Data); is_list(Data) ->
|
|
unicode:characters_to_binary(Data);
|
|
str_utf8(Data) ->
|
|
unicode:characters_to_binary(str(Data)).
|
|
|
|
bool(Data) ->
|
|
emqx_utils_conv:bool(Data).
|
|
|
|
int(Data) ->
|
|
emqx_utils_conv:int(Data).
|
|
|
|
float(Data) ->
|
|
emqx_utils_conv:float(Data).
|
|
|
|
float(Data, Decimals) when Decimals > 0 ->
|
|
Data1 = emqx_utils_conv:float(Data),
|
|
list_to_float(float_to_list(Data1, [{decimals, Decimals}])).
|
|
|
|
float2str(Float, Precision) ->
|
|
float_to_binary(Float, [{decimals, Precision}, compact]).
|
|
|
|
map(Bin) when is_binary(Bin) ->
|
|
case emqx_utils_json:decode(Bin) of
|
|
Map = #{} ->
|
|
Map;
|
|
_ ->
|
|
error(badarg, [Bin])
|
|
end;
|
|
map(List) when is_list(List) ->
|
|
maps:from_list(List);
|
|
map(Map = #{}) ->
|
|
Map;
|
|
map(Data) ->
|
|
error(badarg, [Data]).
|
|
|
|
bin2hexstr(Bin) when is_binary(Bin) ->
|
|
emqx_utils:bin_to_hexstr(Bin, upper).
|
|
|
|
hexstr2bin(Str) when is_binary(Str) ->
|
|
emqx_utils:hexstr_to_bin(Str).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% NULL Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
is_null(undefined) -> true;
|
|
is_null(_Data) -> false.
|
|
|
|
is_not_null(Data) ->
|
|
not is_null(Data).
|
|
|
|
is_str(T) when is_binary(T) -> true;
|
|
is_str(_) -> false.
|
|
|
|
is_bool(T) when is_boolean(T) -> true;
|
|
is_bool(_) -> false.
|
|
|
|
is_int(T) when is_integer(T) -> true;
|
|
is_int(_) -> false.
|
|
|
|
is_float(T) when erlang:is_float(T) -> true;
|
|
is_float(_) -> false.
|
|
|
|
is_num(T) when is_number(T) -> true;
|
|
is_num(_) -> false.
|
|
|
|
is_map(T) when erlang:is_map(T) -> true;
|
|
is_map(_) -> false.
|
|
|
|
is_array(T) when is_list(T) -> true;
|
|
is_array(_) -> false.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% String Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
lower(S) when is_binary(S) ->
|
|
string:lowercase(S).
|
|
|
|
ltrim(S) when is_binary(S) ->
|
|
string:trim(S, leading).
|
|
|
|
reverse(S) when is_binary(S) ->
|
|
iolist_to_binary(string:reverse(S)).
|
|
|
|
rtrim(S) when is_binary(S) ->
|
|
string:trim(S, trailing).
|
|
|
|
strlen(S) when is_binary(S) ->
|
|
string:length(S).
|
|
|
|
substr(S, Start) when is_binary(S), is_integer(Start) ->
|
|
string:slice(S, Start).
|
|
|
|
substr(S, Start, Length) when
|
|
is_binary(S),
|
|
is_integer(Start),
|
|
is_integer(Length)
|
|
->
|
|
string:slice(S, Start, Length).
|
|
|
|
trim(S) when is_binary(S) ->
|
|
string:trim(S).
|
|
|
|
upper(S) when is_binary(S) ->
|
|
string:uppercase(S).
|
|
|
|
split(S, P) when is_binary(S), is_binary(P) ->
|
|
[R || R <- string:split(S, P, all), R =/= <<>> andalso R =/= ""].
|
|
|
|
split(S, P, <<"notrim">>) ->
|
|
string:split(S, P, all);
|
|
split(S, P, <<"leading_notrim">>) ->
|
|
string:split(S, P, leading);
|
|
split(S, P, <<"leading">>) when is_binary(S), is_binary(P) ->
|
|
[R || R <- string:split(S, P, leading), R =/= <<>> andalso R =/= ""];
|
|
split(S, P, <<"trailing_notrim">>) ->
|
|
string:split(S, P, trailing);
|
|
split(S, P, <<"trailing">>) when is_binary(S), is_binary(P) ->
|
|
[R || R <- string:split(S, P, trailing), R =/= <<>> andalso R =/= ""].
|
|
|
|
tokens(S, Separators) ->
|
|
[list_to_binary(R) || R <- string:lexemes(binary_to_list(S), binary_to_list(Separators))].
|
|
|
|
tokens(S, Separators, <<"nocrlf">>) ->
|
|
[
|
|
list_to_binary(R)
|
|
|| R <- string:lexemes(binary_to_list(S), binary_to_list(Separators) ++ [$\r, $\n, [$\r, $\n]])
|
|
].
|
|
|
|
%% implicit convert args to strings, and then do concatenation
|
|
concat(S1, S2) ->
|
|
unicode:characters_to_binary([str(S1), str(S2)], unicode).
|
|
|
|
sprintf_s(Format, Args) when is_list(Args) ->
|
|
erlang:iolist_to_binary(io_lib:format(binary_to_list(Format), Args)).
|
|
|
|
pad(S, Len) when is_binary(S), is_integer(Len) ->
|
|
iolist_to_binary(string:pad(S, Len, trailing)).
|
|
|
|
pad(S, Len, <<"trailing">>) when is_binary(S), is_integer(Len) ->
|
|
iolist_to_binary(string:pad(S, Len, trailing));
|
|
pad(S, Len, <<"both">>) when is_binary(S), is_integer(Len) ->
|
|
iolist_to_binary(string:pad(S, Len, both));
|
|
pad(S, Len, <<"leading">>) when is_binary(S), is_integer(Len) ->
|
|
iolist_to_binary(string:pad(S, Len, leading)).
|
|
|
|
pad(S, Len, <<"trailing">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) ->
|
|
Chars = unicode:characters_to_list(Char, utf8),
|
|
iolist_to_binary(string:pad(S, Len, trailing, Chars));
|
|
pad(S, Len, <<"both">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) ->
|
|
Chars = unicode:characters_to_list(Char, utf8),
|
|
iolist_to_binary(string:pad(S, Len, both, Chars));
|
|
pad(S, Len, <<"leading">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) ->
|
|
Chars = unicode:characters_to_list(Char, utf8),
|
|
iolist_to_binary(string:pad(S, Len, leading, Chars)).
|
|
|
|
replace(SrcStr, P, RepStr) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) ->
|
|
iolist_to_binary(string:replace(SrcStr, P, RepStr, all)).
|
|
|
|
replace(SrcStr, P, RepStr, <<"all">>) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) ->
|
|
iolist_to_binary(string:replace(SrcStr, P, RepStr, all));
|
|
replace(SrcStr, P, RepStr, <<"trailing">>) when
|
|
is_binary(SrcStr), is_binary(P), is_binary(RepStr)
|
|
->
|
|
iolist_to_binary(string:replace(SrcStr, P, RepStr, trailing));
|
|
replace(SrcStr, P, RepStr, <<"leading">>) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) ->
|
|
iolist_to_binary(string:replace(SrcStr, P, RepStr, leading)).
|
|
|
|
regex_match(Str, RE) ->
|
|
case re:run(Str, RE, [global, {capture, none}]) of
|
|
match -> true;
|
|
nomatch -> false
|
|
end.
|
|
|
|
regex_replace(SrcStr, RE, RepStr) ->
|
|
re:replace(SrcStr, RE, RepStr, [global, {return, binary}]).
|
|
|
|
ascii(Char) when is_binary(Char) ->
|
|
[FirstC | _] = binary_to_list(Char),
|
|
FirstC.
|
|
|
|
find(S, P) when is_binary(S), is_binary(P) ->
|
|
find_s(S, P, leading).
|
|
|
|
find(S, P, <<"trailing">>) when is_binary(S), is_binary(P) ->
|
|
find_s(S, P, trailing);
|
|
find(S, P, <<"leading">>) when is_binary(S), is_binary(P) ->
|
|
find_s(S, P, leading).
|
|
|
|
find_s(S, P, Dir) ->
|
|
case string:find(S, P, Dir) of
|
|
nomatch -> <<"">>;
|
|
SubStr -> SubStr
|
|
end.
|
|
|
|
-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when
|
|
FilterProgram :: binary(),
|
|
JSON :: binary() | term(),
|
|
TimeoutMS :: non_neg_integer(),
|
|
Result :: [term()].
|
|
jq(FilterProgram, JSONBin, TimeoutMS) when
|
|
is_binary(FilterProgram), is_binary(JSONBin)
|
|
->
|
|
case jq:process_json(FilterProgram, JSONBin, TimeoutMS) of
|
|
{ok, Result} ->
|
|
[json_decode(JSONString) || JSONString <- Result];
|
|
{error, ErrorReason} ->
|
|
erlang:throw({jq_exception, ErrorReason})
|
|
end;
|
|
jq(FilterProgram, JSONTerm, TimeoutMS) when is_binary(FilterProgram) ->
|
|
JSONBin = json_encode(JSONTerm),
|
|
jq(FilterProgram, JSONBin, TimeoutMS).
|
|
|
|
-spec jq(FilterProgram, JSON) -> Result when
|
|
FilterProgram :: binary(),
|
|
JSON :: binary() | term(),
|
|
Result :: [term()].
|
|
jq(FilterProgram, JSONBin) ->
|
|
ConfigRootKey = emqx_rule_engine_schema:namespace(),
|
|
jq(
|
|
FilterProgram,
|
|
JSONBin,
|
|
emqx_config:get([
|
|
ConfigRootKey,
|
|
jq_function_default_timeout
|
|
])
|
|
).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Array Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
nth(N, L) when is_integer(N), is_list(L) ->
|
|
lists:nth(N, L).
|
|
|
|
length(List) when is_list(List) ->
|
|
erlang:length(List).
|
|
|
|
sublist(Len, List) when is_integer(Len), is_list(List) ->
|
|
lists:sublist(List, Len).
|
|
|
|
sublist(Start, Len, List) when is_integer(Start), is_integer(Len), is_list(List) ->
|
|
lists:sublist(List, Start, Len).
|
|
|
|
first(List) when is_list(List) ->
|
|
hd(List).
|
|
|
|
last(List) when is_list(List) ->
|
|
lists:last(List).
|
|
|
|
contains(Elm, List) when is_list(List) ->
|
|
lists:member(Elm, List).
|
|
|
|
map_new() ->
|
|
#{}.
|
|
|
|
map_get(Key, Map) ->
|
|
map_get(Key, Map, undefined).
|
|
|
|
map_get(Key, Map, Default) ->
|
|
emqx_rule_maps:nested_get(map_path(Key), Map, Default).
|
|
|
|
map_put(Key, Val, Map) ->
|
|
emqx_rule_maps:nested_put(map_path(Key), Val, Map).
|
|
|
|
mget(Key, Map) ->
|
|
mget(Key, Map, undefined).
|
|
|
|
mget(Key, Map, Default) ->
|
|
case maps:find(Key, Map) of
|
|
{ok, Val} ->
|
|
Val;
|
|
error when is_atom(Key) ->
|
|
%% the map may have an equivalent binary-form key
|
|
BinKey = emqx_utils_conv:bin(Key),
|
|
case maps:find(BinKey, Map) of
|
|
{ok, Val} -> Val;
|
|
error -> Default
|
|
end;
|
|
error when is_binary(Key) ->
|
|
%% the map may have an equivalent atom-form key
|
|
try
|
|
AtomKey = list_to_existing_atom(binary_to_list(Key)),
|
|
case maps:find(AtomKey, Map) of
|
|
{ok, Val} -> Val;
|
|
error -> Default
|
|
end
|
|
catch
|
|
error:badarg ->
|
|
Default
|
|
end;
|
|
error ->
|
|
Default
|
|
end.
|
|
|
|
mput(Key, Val, Map) ->
|
|
case maps:find(Key, Map) of
|
|
{ok, _} ->
|
|
maps:put(Key, Val, Map);
|
|
error when is_atom(Key) ->
|
|
%% the map may have an equivalent binary-form key
|
|
BinKey = emqx_utils_conv:bin(Key),
|
|
case maps:find(BinKey, Map) of
|
|
{ok, _} -> maps:put(BinKey, Val, Map);
|
|
error -> maps:put(Key, Val, Map)
|
|
end;
|
|
error when is_binary(Key) ->
|
|
%% the map may have an equivalent atom-form key
|
|
try
|
|
AtomKey = list_to_existing_atom(binary_to_list(Key)),
|
|
case maps:find(AtomKey, Map) of
|
|
{ok, _} -> maps:put(AtomKey, Val, Map);
|
|
error -> maps:put(Key, Val, Map)
|
|
end
|
|
catch
|
|
error:badarg ->
|
|
maps:put(Key, Val, Map)
|
|
end;
|
|
error ->
|
|
maps:put(Key, Val, Map)
|
|
end.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Hash Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
md5(S) when is_binary(S) ->
|
|
hash(md5, S).
|
|
|
|
sha(S) when is_binary(S) ->
|
|
hash(sha, S).
|
|
|
|
sha256(S) when is_binary(S) ->
|
|
hash(sha256, S).
|
|
|
|
hash(Type, Data) ->
|
|
emqx_utils:bin_to_hexstr(crypto:hash(Type, Data), lower).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% gzip Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
gzip(S) when is_binary(S) ->
|
|
zlib:gzip(S).
|
|
|
|
gunzip(S) when is_binary(S) ->
|
|
zlib:gunzip(S).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% zip Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
zip(S) when is_binary(S) ->
|
|
zlib:zip(S).
|
|
|
|
unzip(S) when is_binary(S) ->
|
|
zlib:unzip(S).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% zip_compress Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
zip_compress(S) when is_binary(S) ->
|
|
zlib:compress(S).
|
|
|
|
zip_uncompress(S) when is_binary(S) ->
|
|
zlib:uncompress(S).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Data encode and decode Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
base64_encode(Data) when is_binary(Data) ->
|
|
base64:encode(Data).
|
|
|
|
base64_decode(Data) when is_binary(Data) ->
|
|
base64:decode(Data).
|
|
|
|
json_encode(Data) ->
|
|
emqx_utils_json:encode(Data).
|
|
|
|
json_decode(Data) ->
|
|
emqx_utils_json:decode(Data, [return_maps]).
|
|
|
|
term_encode(Term) ->
|
|
erlang:term_to_binary(Term).
|
|
|
|
term_decode(Data) when is_binary(Data) ->
|
|
erlang:binary_to_term(Data).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Random Funcs
|
|
%%------------------------------------------------------------------------------
|
|
random() ->
|
|
rand:uniform().
|
|
|
|
uuid_v4() ->
|
|
uuid_str(uuid:get_v4(), binary_standard).
|
|
|
|
uuid_v4_no_hyphen() ->
|
|
uuid_str(uuid:get_v4(), binary_nodash).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Dict Funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-define(DICT_KEY(KEY), {'@rule_engine', KEY}).
|
|
proc_dict_get(Key) ->
|
|
erlang:get(?DICT_KEY(Key)).
|
|
|
|
proc_dict_put(Key, Val) ->
|
|
erlang:put(?DICT_KEY(Key), Val).
|
|
|
|
proc_dict_del(Key) ->
|
|
erlang:erase(?DICT_KEY(Key)).
|
|
|
|
kv_store_put(Key, Val) ->
|
|
ets:insert(?KV_TAB, {Key, Val}).
|
|
|
|
kv_store_get(Key) ->
|
|
kv_store_get(Key, undefined).
|
|
kv_store_get(Key, Default) ->
|
|
case ets:lookup(?KV_TAB, Key) of
|
|
[{_, Val}] -> Val;
|
|
_ -> Default
|
|
end.
|
|
|
|
kv_store_del(Key) ->
|
|
ets:delete(?KV_TAB, Key).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Date functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
now_rfc3339() ->
|
|
now_to_rfc3339().
|
|
|
|
now_rfc3339(Unit) ->
|
|
now_to_rfc3339(time_unit(Unit)).
|
|
|
|
unix_ts_to_rfc3339(Epoch) ->
|
|
epoch_to_rfc3339(Epoch, second).
|
|
|
|
unix_ts_to_rfc3339(Epoch, Unit) when is_integer(Epoch) ->
|
|
epoch_to_rfc3339(Epoch, time_unit(Unit)).
|
|
|
|
rfc3339_to_unix_ts(DateTime) ->
|
|
rfc3339_to_unix_ts(DateTime, second).
|
|
|
|
rfc3339_to_unix_ts(DateTime, Unit) when is_binary(DateTime) ->
|
|
calendar:rfc3339_to_system_time(
|
|
binary_to_list(DateTime),
|
|
[{unit, time_unit(Unit)}]
|
|
).
|
|
|
|
now_timestamp() ->
|
|
erlang:system_time(second).
|
|
|
|
now_timestamp(Unit) ->
|
|
erlang:system_time(time_unit(Unit)).
|
|
|
|
format_date(TimeUnit, Offset, FormatString) ->
|
|
Unit = time_unit(TimeUnit),
|
|
TimeEpoch = erlang:system_time(Unit),
|
|
format_date(Unit, Offset, FormatString, TimeEpoch).
|
|
|
|
format_date(TimeUnit, Offset, FormatString, TimeEpoch) ->
|
|
Unit = time_unit(TimeUnit),
|
|
emqx_utils_conv:bin(
|
|
lists:concat(
|
|
emqx_utils_calendar:format(TimeEpoch, Unit, Offset, FormatString)
|
|
)
|
|
).
|
|
|
|
date_to_unix_ts(TimeUnit, FormatString, InputString) ->
|
|
Unit = time_unit(TimeUnit),
|
|
emqx_utils_calendar:parse(InputString, Unit, FormatString).
|
|
|
|
date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
|
|
Unit = time_unit(TimeUnit),
|
|
OffsetSecond = emqx_utils_calendar:offset_second(Offset),
|
|
OffsetDelta = erlang:convert_time_unit(OffsetSecond, second, Unit),
|
|
date_to_unix_ts(Unit, FormatString, InputString) - OffsetDelta.
|
|
|
|
timezone_to_second(TimeZone) ->
|
|
timezone_to_offset_seconds(TimeZone).
|
|
|
|
timezone_to_offset_seconds(TimeZone) ->
|
|
emqx_utils_calendar:offset_second(TimeZone).
|
|
|
|
'$handle_undefined_function'(sprintf, [Format | Args]) ->
|
|
erlang:apply(fun sprintf_s/2, [Format, Args]);
|
|
%% This is for functions that should be handled in another module
|
|
%% (currently this module is emqx_schema_registry_serde in the case of EE but
|
|
%% could be changed to another module in the future).
|
|
'$handle_undefined_function'(FunctionName, Args) ->
|
|
case emqx_rule_engine:extra_functions_module() of
|
|
undefined ->
|
|
throw_sql_function_not_supported(FunctionName, Args);
|
|
Mod ->
|
|
case Mod:handle_rule_function(FunctionName, Args) of
|
|
{error, no_match_for_function} ->
|
|
throw_sql_function_not_supported(FunctionName, Args);
|
|
Result ->
|
|
Result
|
|
end
|
|
end.
|
|
|
|
-spec throw_sql_function_not_supported(atom(), list()) -> no_return().
|
|
throw_sql_function_not_supported(FunctionName, Args) ->
|
|
error({sql_function_not_supported, function_literal(FunctionName, Args)}).
|
|
|
|
map_path(Key) ->
|
|
{path, [{key, P} || P <- string:split(Key, ".", all)]}.
|
|
|
|
function_literal(Fun, []) when is_atom(Fun) ->
|
|
atom_to_list(Fun) ++ "()";
|
|
function_literal(Fun, [FArg | Args]) when is_atom(Fun), is_list(Args) ->
|
|
WithFirstArg = io_lib:format("~ts(~0p", [atom_to_list(Fun), FArg]),
|
|
lists:foldl(
|
|
fun(Arg, Literal) ->
|
|
io_lib:format("~ts, ~0p", [Literal, Arg])
|
|
end,
|
|
WithFirstArg,
|
|
Args
|
|
) ++ ")";
|
|
function_literal(Fun, Args) ->
|
|
{invalid_func, {Fun, Args}}.
|
|
|
|
mongo_date() ->
|
|
maybe_isodate_format(erlang:timestamp()).
|
|
|
|
mongo_date(MillisecondsTimestamp) ->
|
|
maybe_isodate_format(convert_timestamp(MillisecondsTimestamp)).
|
|
|
|
mongo_date(Timestamp, Unit) ->
|
|
InsertedTimeUnit = time_unit(Unit),
|
|
ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond),
|
|
mongo_date(ScaledEpoch).
|
|
|
|
maybe_isodate_format(ErlTimestamp) ->
|
|
case emqx_rule_sqltester:is_test_runtime_env() of
|
|
false ->
|
|
ErlTimestamp;
|
|
true ->
|
|
%% if this is called from sqltest, we need to convert it to the ISODate() format,
|
|
%% so that it can be correctly converted into a JSON string.
|
|
isodate_format(ErlTimestamp)
|
|
end.
|
|
|
|
isodate_format({MegaSecs, Secs, MicroSecs}) ->
|
|
SystemTimeMs = (MegaSecs * 1000_000_000_000 + Secs * 1000_000 + MicroSecs) div 1000,
|
|
Ts3339Str = calendar:system_time_to_rfc3339(SystemTimeMs, [{unit, millisecond}, {offset, "Z"}]),
|
|
iolist_to_binary(["ISODate(", Ts3339Str, ")"]).
|
|
|
|
convert_timestamp(MillisecondsTimestamp) ->
|
|
MicroTimestamp = MillisecondsTimestamp * 1000,
|
|
MegaSecs = MicroTimestamp div 1000_000_000_000,
|
|
Secs = MicroTimestamp div 1000_000 - MegaSecs * 1000_000,
|
|
MicroSecs = MicroTimestamp rem 1000_000,
|
|
{MegaSecs, Secs, MicroSecs}.
|
|
|
|
uuid_str(UUID, DisplayOpt) ->
|
|
uuid:uuid_to_string(UUID, DisplayOpt).
|