parent
0414cb6c86
commit
c2e1bc039b
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
-define(APP, emqx_rule_engine).
|
-define(APP, emqx_rule_engine).
|
||||||
|
|
||||||
|
-define(KV_TAB, '@rule_engine_db').
|
||||||
|
|
||||||
-type(maybe(T) :: T | undefined).
|
-type(maybe(T) :: T | undefined).
|
||||||
|
|
||||||
-type(rule_id() :: binary()).
|
-type(rule_id() :: binary()).
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
-module(emqx_rule_funcs).
|
-module(emqx_rule_funcs).
|
||||||
|
|
||||||
|
-include("rule_engine.hrl").
|
||||||
|
|
||||||
%% IoT Funcs
|
%% IoT Funcs
|
||||||
-export([ msgid/0
|
-export([ msgid/0
|
||||||
, qos/0
|
, qos/0
|
||||||
|
@ -91,6 +93,8 @@
|
||||||
, int/1
|
, int/1
|
||||||
, float/1
|
, float/1
|
||||||
, map/1
|
, map/1
|
||||||
|
, bin2hexstr/1
|
||||||
|
, hexstr2bin/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Data Type Validation Funcs
|
%% Data Type Validation Funcs
|
||||||
|
@ -169,6 +173,8 @@
|
||||||
, base64_decode/1
|
, base64_decode/1
|
||||||
, json_decode/1
|
, json_decode/1
|
||||||
, json_encode/1
|
, json_encode/1
|
||||||
|
, term_decode/1
|
||||||
|
, term_encode/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Date functions
|
%% Date functions
|
||||||
|
@ -178,6 +184,16 @@
|
||||||
, now_timestamp/1
|
, now_timestamp/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Proc Dict Func
|
||||||
|
-export([ proc_dict_get/1
|
||||||
|
, proc_dict_put/2
|
||||||
|
, proc_dict_del/1
|
||||||
|
, kv_store_get/1
|
||||||
|
, kv_store_get/2
|
||||||
|
, kv_store_put/2
|
||||||
|
, kv_store_del/1
|
||||||
|
]).
|
||||||
|
|
||||||
-export(['$handle_undefined_function'/2]).
|
-export(['$handle_undefined_function'/2]).
|
||||||
|
|
||||||
-compile({no_auto_import,
|
-compile({no_auto_import,
|
||||||
|
@ -495,6 +511,14 @@ float(Data) ->
|
||||||
map(Data) ->
|
map(Data) ->
|
||||||
emqx_rule_utils:map(Data).
|
emqx_rule_utils:map(Data).
|
||||||
|
|
||||||
|
bin2hexstr(Bin) when is_binary(Bin) ->
|
||||||
|
IntL = binary_to_list(Bin),
|
||||||
|
list_to_binary([io_lib:format("~2.16.0B", [Int]) || Int <- IntL]).
|
||||||
|
|
||||||
|
hexstr2bin(Str) when is_binary(Str) ->
|
||||||
|
list_to_binary([binary_to_integer(W, 16) || <<W:2/binary>> <= Str]).
|
||||||
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% NULL Funcs
|
%% NULL Funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -804,7 +828,7 @@ hexstring(<<X:256/big-unsigned-integer>>) ->
|
||||||
iolist_to_binary(io_lib:format("~64.16.0b", [X])).
|
iolist_to_binary(io_lib:format("~64.16.0b", [X])).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Base64 Funcs
|
%% Data encode and decode Funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
base64_encode(Data) when is_binary(Data) ->
|
base64_encode(Data) when is_binary(Data) ->
|
||||||
|
@ -819,6 +843,40 @@ json_encode(Data) ->
|
||||||
json_decode(Data) ->
|
json_decode(Data) ->
|
||||||
emqx_json:decode(Data, [return_maps]).
|
emqx_json:decode(Data, [return_maps]).
|
||||||
|
|
||||||
|
term_encode(Term) ->
|
||||||
|
erlang:term_to_binary(Term).
|
||||||
|
|
||||||
|
term_decode(Data) when is_binary(Data) ->
|
||||||
|
erlang:binary_to_term(Data).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Dict Funcs
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(DICT_KEY(KEY), {'@rule_engine', KEY}).
|
||||||
|
proc_dict_get(Key) ->
|
||||||
|
erlang:get(?DICT_KEY(Key)).
|
||||||
|
|
||||||
|
proc_dict_put(Key, Val) ->
|
||||||
|
erlang:put(?DICT_KEY(Key), Val).
|
||||||
|
|
||||||
|
proc_dict_del(Key) ->
|
||||||
|
erlang:erase(?DICT_KEY(Key)).
|
||||||
|
|
||||||
|
kv_store_put(Key, Val) ->
|
||||||
|
ets:insert(?KV_TAB, {Key, Val}).
|
||||||
|
|
||||||
|
kv_store_get(Key) ->
|
||||||
|
kv_store_get(Key, undefined).
|
||||||
|
kv_store_get(Key, Default) ->
|
||||||
|
case ets:lookup(?KV_TAB, Key) of
|
||||||
|
[{_, Val}] -> Val;
|
||||||
|
_ -> Default
|
||||||
|
end.
|
||||||
|
|
||||||
|
kv_store_del(Key) ->
|
||||||
|
ets:delete(?KV_TAB, Key).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Date functions
|
%% Date functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -387,6 +387,8 @@ delete_resource_type(Type) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% Enable stats timer
|
%% Enable stats timer
|
||||||
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
|
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
|
||||||
|
ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
||||||
|
{read_concurrency, true}]),
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
|
|
@ -69,7 +69,9 @@ groups() ->
|
||||||
t_resource_types_cli
|
t_resource_types_cli
|
||||||
]},
|
]},
|
||||||
{funcs, [],
|
{funcs, [],
|
||||||
[t_topic_func]},
|
[t_topic_func,
|
||||||
|
t_kv_store
|
||||||
|
]},
|
||||||
{registry, [sequence],
|
{registry, [sequence],
|
||||||
[t_add_get_remove_rule,
|
[t_add_get_remove_rule,
|
||||||
t_add_get_remove_rules,
|
t_add_get_remove_rules,
|
||||||
|
@ -597,6 +599,14 @@ t_topic_func(_Config) ->
|
||||||
%%TODO:
|
%%TODO:
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_kv_store(_) ->
|
||||||
|
undefined = emqx_rule_funcs:kv_store_get(<<"abc">>),
|
||||||
|
<<"not_found">> = emqx_rule_funcs:kv_store_get(<<"abc">>, <<"not_found">>),
|
||||||
|
emqx_rule_funcs:kv_store_put(<<"abc">>, 1),
|
||||||
|
1 = emqx_rule_funcs:kv_store_get(<<"abc">>),
|
||||||
|
emqx_rule_funcs:kv_store_del(<<"abc">>),
|
||||||
|
undefined = emqx_rule_funcs:kv_store_get(<<"abc">>).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Test cases for rule registry
|
%% Test cases for rule registry
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -143,6 +143,40 @@ t_bool(_) ->
|
||||||
?assertEqual(false, emqx_rule_funcs:bool(<<"false">>)),
|
?assertEqual(false, emqx_rule_funcs:bool(<<"false">>)),
|
||||||
?assertError({invalid_boolean, _}, emqx_rule_funcs:bool(3)).
|
?assertError({invalid_boolean, _}, emqx_rule_funcs:bool(3)).
|
||||||
|
|
||||||
|
t_proc_dict_put_get_del(_) ->
|
||||||
|
?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)),
|
||||||
|
emqx_rule_funcs:proc_dict_put(<<"abc">>, 1),
|
||||||
|
?assertEqual(1, emqx_rule_funcs:proc_dict_get(<<"abc">>)),
|
||||||
|
emqx_rule_funcs:proc_dict_del(<<"abc">>),
|
||||||
|
?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)).
|
||||||
|
|
||||||
|
t_term_encode(_) ->
|
||||||
|
TestData = [<<"abc">>, #{a => 1}, #{<<"3">> => [1,2,4]}],
|
||||||
|
lists:foreach(fun(Data) ->
|
||||||
|
?assertEqual(Data,
|
||||||
|
emqx_rule_funcs:term_decode(
|
||||||
|
emqx_rule_funcs:term_encode(Data)))
|
||||||
|
end, TestData).
|
||||||
|
|
||||||
|
t_hexstr2bin(_) ->
|
||||||
|
?assertEqual(<<1,2>>, emqx_rule_funcs:hexstr2bin(<<"0102">>)),
|
||||||
|
?assertEqual(<<17,33>>, emqx_rule_funcs:hexstr2bin(<<"1121">>)).
|
||||||
|
|
||||||
|
t_bin2hexstr(_) ->
|
||||||
|
?assertEqual(<<"0102">>, emqx_rule_funcs:bin2hexstr(<<1,2>>)),
|
||||||
|
?assertEqual(<<"1121">>, emqx_rule_funcs:bin2hexstr(<<17,33>>)).
|
||||||
|
|
||||||
|
t_hex_convert(_) ->
|
||||||
|
?PROPTEST(hex_convert).
|
||||||
|
|
||||||
|
hex_convert() ->
|
||||||
|
?FORALL(L, list(range(0, 255)),
|
||||||
|
begin
|
||||||
|
AbitraryBin = list_to_binary(L),
|
||||||
|
AbitraryBin == emqx_rule_funcs:hexstr2bin(
|
||||||
|
emqx_rule_funcs:bin2hexstr(AbitraryBin))
|
||||||
|
end).
|
||||||
|
|
||||||
t_is_null(_) ->
|
t_is_null(_) ->
|
||||||
?assertEqual(true, emqx_rule_funcs:is_null(undefined)),
|
?assertEqual(true, emqx_rule_funcs:is_null(undefined)),
|
||||||
?assertEqual(false, emqx_rule_funcs:is_null(a)),
|
?assertEqual(false, emqx_rule_funcs:is_null(a)),
|
||||||
|
|
Loading…
Reference in New Issue