diff --git a/.gitignore b/.gitignore index f59eb37a2..84f92705f 100644 --- a/.gitignore +++ b/.gitignore @@ -34,4 +34,5 @@ Mnesia.*/ *.DS_Store _checkouts rebar.config.rendered -/rebar3 \ No newline at end of file +/rebar3 +rebar.lock \ No newline at end of file diff --git a/Makefile b/Makefile index 0718d4e5a..a33d9817e 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,14 @@ ensure-rebar3: $(REBAR): ensure-rebar3 +.PHONY: xref +xref: $(REBAR) + $(REBAR) xref + +.PHONY: dialyzer +dialyzer: $(REBAR) + $(REBAR) dialyzer + .PHONY: distclean distclean: @rm -rf _build diff --git a/README.md b/README.md index 93813e1a8..e1b2f8215 100644 --- a/README.md +++ b/README.md @@ -40,13 +40,22 @@ Get the binary package of the corresponding OS from [EMQ X Download](https://www The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release. +For 4.3 and later versions. + +```bash +git clone https://github.com/emqx/emqx.git +cd emqx +make +_build/emqx/rel/emqx/bin console ``` -git clone -b v4.0.0 https://github.com/emqx/emqx-rel.git -cd emqx-rel && make - -cd _build/emqx/rel/emqx && ./bin/emqx console +For earlier versions, release has to be built from another repo. +```bash +git clone https://github.com/emqx/emqx-rel.git +cd emqx-rel +make +_build/emqx/rel/emqx/bin console ``` ## Quick Start diff --git a/apps/emqx_auth_http/src/emqx_acl_http.erl b/apps/emqx_auth_http/src/emqx_acl_http.erl index a6f60b465..23588fe9a 100644 --- a/apps/emqx_auth_http/src/emqx_acl_http.erl +++ b/apps/emqx_auth_http/src/emqx_acl_http.erl @@ -36,6 +36,7 @@ -spec(register_metrics() -> ok). register_metrics() -> + io:format("testing"), lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS). %%-------------------------------------------------------------------- diff --git a/apps/emqx_auth_http/src/emqx_http_client.erl b/apps/emqx_auth_http/src/emqx_http_client.erl index e29d798de..217ee4b58 100644 --- a/apps/emqx_auth_http/src/emqx_http_client.erl +++ b/apps/emqx_auth_http/src/emqx_http_client.erl @@ -191,7 +191,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> - gropc:disconnect_worker(Pool, {Pool, Id}), + gproc_pool:disconnect_worker(Pool, {Pool, Id}), ok. code_change(_OldVsn, State, _Extra) -> @@ -253,4 +253,4 @@ flush_stream(Client, StreamRef) -> flush_stream(Client, StreamRef) after 0 -> ok - end. \ No newline at end of file + end. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index b5a319278..00aa5b296 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -128,7 +128,6 @@ , export_auth_username/0 , export_auth_mnesia/0 , export_acl_mnesia/0 - , export_schemas/0 , import_rules/1 , import_resources/1 , import_blacklist/1 @@ -138,7 +137,6 @@ , import_auth_username/1 , import_auth_mnesia/1 , import_acl_mnesia/1 - , import_schemas/1 , to_version/1 ]). @@ -679,13 +677,6 @@ export_acl_mnesia() -> end, [], ets:tab2list(emqx_acl)) end. -export_schemas() -> - case ets:info(emqx_schema) of - undefined -> []; - _ -> - [emqx_schema_api:format_schema(Schema) || Schema <- emqx_schema_registry:get_all_schemas()] - end. - import_rules(Rules) -> lists:foreach(fun(#{<<"id">> := RuleId, <<"rawsql">> := RawSQL, @@ -788,19 +779,13 @@ import_auth_mnesia(Auths) -> import_acl_mnesia(Acls) -> case ets:info(emqx_acl) of undefined -> ok; - _ -> + _ -> [ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login, <<"topic">> := Topic, <<"action">> := Action, <<"allow">> := Allow} <- Acls ] end. -import_schemas(Schemas) -> - case ets:info(emqx_schema) of - undefined -> ok; - _ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas] - end. - any_to_atom(L) when is_list(L) -> list_to_atom(L); any_to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8); any_to_atom(A) when is_atom(A) -> A. diff --git a/apps/emqx_management/src/emqx_mgmt_api_data.erl b/apps/emqx_management/src/emqx_mgmt_api_data.erl index a449141ce..423504474 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_data.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_data.erl @@ -84,7 +84,6 @@ export(_Bindings, _Params) -> AuthUsername = emqx_mgmt:export_auth_username(), AuthMnesia = emqx_mgmt:export_auth_mnesia(), AclMnesia = emqx_mgmt:export_acl_mnesia(), - Schemas = emqx_mgmt:export_schemas(), Seconds = erlang:system_time(second), {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), @@ -100,8 +99,7 @@ export(_Bindings, _Params) -> {auth_clientid, AuthClientid}, {auth_username, AuthUsername}, {auth_mnesia, AuthMnesia}, - {acl_mnesia, AclMnesia}, - {schemas, Schemas} + {acl_mnesia, AclMnesia} ], Bin = emqx_json:encode(Data), @@ -180,20 +178,19 @@ do_import(Filename) -> case lists:member(Version, ?VERSIONS) of true -> try - emqx_mgmt:import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])), + %emqx_mgmt:import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])), emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])), emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])), emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])), emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), - emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])), + %emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])), emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), - emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), - emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), - emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])), + %emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), + %emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), logger:debug("The emqx data has been imported successfully"), - ok + error({not_implemented, [import_confs,import_modules,import_auth_mnesia,import_acl_mnesia]}) catch Class:Reason:Stack -> logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]), {error, import_failed} diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index b0870cbf3..34b64ee5d 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -567,7 +567,6 @@ data(["export"]) -> AuthUsername = emqx_mgmt:export_auth_username(), AuthMnesia = emqx_mgmt:export_auth_mnesia(), AclMnesia = emqx_mgmt:export_acl_mnesia(), - Schemas = emqx_mgmt:export_schemas(), Seconds = erlang:system_time(second), {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), @@ -583,8 +582,8 @@ data(["export"]) -> {auth_clientid, AuthClientID}, {auth_username, AuthUsername}, {auth_mnesia, AuthMnesia}, - {acl_mnesia, AclMnesia}, - {schemas, Schemas}], + {acl_mnesia, AclMnesia} + ], ok = filelib:ensure_dir(NFilename), case file:write_file(NFilename, emqx_json:encode(Data)) of ok -> @@ -610,7 +609,6 @@ data(["import", Filename]) -> emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])), emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])), - emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])), emqx_ctl:print("The emqx data has been imported successfully.~n") catch Class:Reason:Stack -> emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}]) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 43de01593..454c8ac6e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -45,7 +45,7 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_metrics]}, - {ok, {{one_for_all, 10, 100}, [Registry, Metrics]}}. + {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. start_locker() -> Locker = #{id => emqx_rule_locker, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 2ad383475..0e0f01c5d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -360,6 +360,12 @@ printable_maps(Headers) -> fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> AccIn#{K => ntoa(V0)}; ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{'User-Property' => maps:from_list(V0)}; + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; (K, V0, AccIn) -> AccIn#{K => V0} end, #{}, Headers). diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index ed2e53efa..a45ac1beb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -78,6 +78,10 @@ , bitxor/2 , bitsl/2 , bitsr/2 + , bitsize/1 + , subbits/2 + , subbits/3 + , subbits/6 ]). %% Data Type Convertion @@ -233,7 +237,7 @@ payload() -> payload(Path) -> fun(#{payload := Payload}) when erlang:is_map(Payload) -> - emqx_rule_maps:nested_get(map_path(Path), Payload); + map_get(Path, Payload); (_) -> undefined end. @@ -401,6 +405,74 @@ bitsl(X, I) when is_integer(X), is_integer(I) -> bitsr(X, I) when is_integer(X), is_integer(I) -> X bsr I. +bitsize(Bits) when is_bitstring(Bits) -> + bit_size(Bits). + +subbits(Bits, Len) when is_integer(Len), is_bitstring(Bits) -> + subbits(Bits, 1, Len). + +subbits(Bits, Start, Len) when is_integer(Start), is_integer(Len), is_bitstring(Bits) -> + get_subbits(Bits, Start, Len, <<"integer">>, <<"unsigned">>, <<"big">>). + +subbits(Bits, Start, Len, Type, Signedness, Endianness) when is_integer(Start), is_integer(Len), is_bitstring(Bits) -> + get_subbits(Bits, Start, Len, Type, Signedness, Endianness). + +get_subbits(Bits, Start, Len, Type, Signedness, Endianness) -> + Begin = Start - 1, + case Bits of + <<_:Begin, Rem/bits>> when Rem =/= <<>> -> + Sz = bit_size(Rem), + do_get_subbits(Rem, Sz, Len, Type, Signedness, Endianness); + _ -> undefined + end. + +-define(match_bits(Bits0, Pattern, ElesePattern), + case Bits0 of + Pattern -> + SubBits; + ElesePattern -> + SubBits + end). +do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"big">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"big">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"big">>) -> + ?match_bits(Bits, <>, + <>); + +do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"big">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"big">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"big">>) -> + ?match_bits(Bits, <>, + <>); + +do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"little">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"little">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"little">>) -> + ?match_bits(Bits, <>, + <>); + +do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"little">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"little">>) -> + ?match_bits(Bits, <>, + <>); +do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) -> + ?match_bits(Bits, <>, + <>). + %%------------------------------------------------------------------------------ %% Data Type Convertion Funcs %%------------------------------------------------------------------------------ @@ -607,52 +679,10 @@ map_get(Key, Map) -> map_get(Key, Map, undefined). map_get(Key, Map, Default) -> - case maps:find(Key, Map) of - {ok, Val} -> Val; - error when is_atom(Key) -> - %% the map may have an equivalent binary-form key - BinKey = emqx_rule_utils:bin(Key), - case maps:find(BinKey, Map) of - {ok, Val} -> Val; - error -> Default - end; - error when is_binary(Key) -> - try %% the map may have an equivalent atom-form key - AtomKey = list_to_existing_atom(binary_to_list(Key)), - case maps:find(AtomKey, Map) of - {ok, Val} -> Val; - error -> Default - end - catch error:badarg -> - Default - end; - error -> - Default - end. + emqx_rule_maps:nested_get(map_path(Key), Map, Default). map_put(Key, Val, Map) -> - case maps:find(Key, Map) of - {ok, _} -> maps:put(Key, Val, Map); - error when is_atom(Key) -> - %% the map may have an equivalent binary-form key - BinKey = emqx_rule_utils:bin(Key), - case maps:find(BinKey, Map) of - {ok, _} -> maps:put(BinKey, Val, Map); - error -> maps:put(Key, Val, Map) - end; - error when is_binary(Key) -> - try %% the map may have an equivalent atom-form key - AtomKey = list_to_existing_atom(binary_to_list(Key)), - case maps:find(AtomKey, Map) of - {ok, _} -> maps:put(AtomKey, Val, Map); - error -> maps:put(Key, Val, Map) - end - catch error:badarg -> - maps:put(Key, Val, Map) - end; - error -> - maps:put(Key, Val, Map) - end. + emqx_rule_maps:nested_put(map_path(Key), Val, Map). mget(Key, Map) -> mget(Key, Map, undefined). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 7b40a12fc..ef6bdea12 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -234,20 +234,26 @@ take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = A = emqx_rule_registry:get_action_instance_params(Id), emqx_rule_metrics:inc_actions_taken(Id), apply_action_func(Selected, Envs, Apply, ActName) + of + {badact, Reason} -> + handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason); + Result -> Result catch error:{badfun, _Func}:_ST -> %?LOG(warning, "Action ~p maybe outdated, refresh it and try again." % "Func: ~p~nST:~0p", [Id, Func, ST]), trans_action_on(Id, fun() -> - emqx_rule_engine:refresh_actions([ActInst]) + emqx_rule_engine:refresh_actions([ActInst]) end, 5000), emqx_rule_metrics:inc_actions_retry(Id), take_action(ActInst, Selected, Envs, OnFailed, RetryN-1); Error:Reason:Stack -> + emqx_rule_metrics:inc_actions_exception(Id), handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack}) end; take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) -> + emqx_rule_metrics:inc_actions_error(Id), handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}). apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) -> @@ -284,12 +290,10 @@ wait_action_on(Id, RetryN) -> end. handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) -> - emqx_rule_metrics:inc_actions_exception(Id), ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]), take_actions(Fallbacks, Selected, Envs, continue), failed; handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) -> - emqx_rule_metrics:inc_actions_exception(Id), ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]), take_actions(Fallbacks, Selected, Envs, continue), error({take_action_failed, {Id, Reason}}). @@ -409,11 +413,13 @@ add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) -> %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ -may_decode_payload(Payload) -> +may_decode_payload(Payload) when is_binary(Payload) -> case get_cached_payload() of - undefined -> ensure_decoded(Payload); + undefined -> safe_decode_and_cache(Payload); DecodedP -> DecodedP - end. + end; +may_decode_payload(Payload) -> + Payload. get_cached_payload() -> erlang:get(rule_payload). @@ -422,9 +428,7 @@ cache_payload(DecodedP) -> erlang:put(rule_payload, DecodedP), DecodedP. -ensure_decoded(Json) when is_map(Json); is_list(Json) -> - Json; -ensure_decoded(MaybeJson) -> +safe_decode_and_cache(MaybeJson) -> try cache_payload(emqx_json:decode(MaybeJson, [return_maps])) catch _:_ -> #{} end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index e097e578b..8486ea16c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -37,6 +37,7 @@ all() -> , {group, runtime} , {group, events} , {group, multi_actions} + , {group, bugs} ]. suite() -> @@ -123,11 +124,15 @@ groups() -> {events, [], [t_events ]}, + {bugs, [], + [t_sqlparse_payload_as + ]}, {multi_actions, [], [t_sqlselect_multi_actoins_1, t_sqlselect_multi_actoins_1_1, t_sqlselect_multi_actoins_2, t_sqlselect_multi_actoins_3, + t_sqlselect_multi_actoins_3_1, t_sqlselect_multi_actoins_4 ]} ]. @@ -200,6 +205,7 @@ init_per_testcase(Test, Config) ;Test =:= t_sqlselect_multi_actoins_1_1 ;Test =:= t_sqlselect_multi_actoins_2 ;Test =:= t_sqlselect_multi_actoins_3 + ;Test =:= t_sqlselect_multi_actoins_3_1 ;Test =:= t_sqlselect_multi_actoins_4 -> ok = emqx_rule_engine:load_providers(), @@ -209,6 +215,12 @@ init_per_testcase(Test, Config) types=[], params_spec = #{}, title = #{en => <<"Crash Action">>}, description = #{en => <<"This action will always fail!">>}}), + ok = emqx_rule_registry:add_action( + #action{name = 'failure_action', app = ?APP, + module = ?MODULE, on_create = failure_action, + types=[], params_spec = #{}, + title = #{en => <<"Crash Action">>}, + description = #{en => <<"This action will always fail!">>}}), ok = emqx_rule_registry:add_action( #action{name = 'plus_by_one', app = ?APP, module = ?MODULE, on_create = plus_by_one_action, @@ -1288,6 +1300,44 @@ t_sqlselect_multi_actoins_3(Config) -> emqx_rule_registry:remove_rule(Rule). +t_sqlselect_multi_actoins_3_1(Config) -> + %% We create 2 actions in the same rule (on_action_failed = continue): + %% The first will fail (with a 'badact' return) and we need to make sure the + %% fallback actions can be executed, and the next actoins + %% will be run without influence + {ok, Rule} = emqx_rule_engine:create_rule( + #{rawsql => ?config(connsql, Config), + on_action_failed => continue, + actions => [ + #{name => 'failure_action', args => #{}, fallbacks =>[ + #{name => 'plus_by_one', args => #{}, fallbacks =>[]}, + #{name => 'plus_by_one', args => #{}, fallbacks =>[]} + ]}, + #{name => 'republish', + args => #{<<"target_topic">> => <<"t2">>, + <<"target_qos">> => -1, + <<"payload_tmpl">> => <<"clientid=${clientid}">> + }, + fallbacks => []} + ] + }), + + (?config(conn_event, Config))(), + timer:sleep(100), + + %% verfiy the fallback actions has been run + ?assertEqual(2, ets:lookup_element(plus_by_one_action, num, 2)), + + %% verfiy the next actions can be run + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"clientid=c_emqx1">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + + emqx_rule_registry:remove_rule(Rule). + t_sqlselect_multi_actoins_4(Config) -> %% We create 2 actions in the same rule (on_action_failed = continue): %% The first will fail and we need to make sure the @@ -1920,6 +1970,44 @@ t_sqlparse_new_map(_Config) -> <<"c">> := [#{}] }, Res00). +t_sqlparse_payload_as(_Config) -> + %% https://github.com/emqx/emqx/issues/3866 + Sql00 = "SELECT " + " payload, map_get('engineWorkTime', payload.params, -1) as payload.params.engineWorkTime, " + " map_get('hydOilTem', payload.params, -1) as payload.params.hydOilTem " + "FROM \"t/#\" ", + Payload1 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42, \"hydOilTem\": 30 } }">>, + {ok, Res01} = emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql00, + <<"ctx">> => #{<<"payload">> => Payload1, + <<"topic">> => <<"t/a">>}}), + ?assertMatch(#{ + <<"payload">> := #{ + <<"params">> := #{ + <<"convertTemp">> := 20, + <<"engineSpeed">> := 42, + <<"engineWorkTime">> := -1, + <<"hydOilTem">> := 30 + } + } + }, Res01), + + Payload2 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42 } }">>, + {ok, Res02} = emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql00, + <<"ctx">> => #{<<"payload">> => Payload2, + <<"topic">> => <<"t/a">>}}), + ?assertMatch(#{ + <<"payload">> := #{ + <<"params">> := #{ + <<"convertTemp">> := 20, + <<"engineSpeed">> := 42, + <<"engineWorkTime">> := -1, + <<"hydOilTem">> := -1 + } + } + }, Res02). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ @@ -2006,6 +2094,12 @@ mfa_action(Id, _Params) -> mfa_action_do(_Data, _Envs, K) -> persistent_term:put(K, 1). +failure_action(_Id, _Params) -> + fun(Data, _Envs) -> + ct:pal("applying crash action, Data: ~p", [Data]), + {badact, intentional_failure} + end. + crash_action(_Id, _Params) -> fun(Data, _Envs) -> ct:pal("applying crash action, Data: ~p", [Data]), diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 98e62f415..c50d0e02d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -489,22 +489,75 @@ t_contains(_) -> t_map_get(_) -> ?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])), - ?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])). + ?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])), + ?assertEqual(1, apply_func(map_get, [<<"a.b">>, #{a => #{b => 1}}])), + ?assertEqual(undefined, apply_func(map_get, [<<"a.c">>, #{a => #{b => 1}}])). t_map_put(_) -> ?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])), - ?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])). + ?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])), + ?assertEqual(#{<<"a">> => #{<<"b">> => 1}}, apply_func(map_put, [<<"a.b">>, 1, #{}])), + ?assertEqual(#{a => #{b => 1, <<"c">> => 1}}, apply_func(map_put, [<<"a.c">>, 1, #{a => #{b => 1}}])). - t_mget(_) -> +t_mget(_) -> ?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])), ?assertEqual(1, apply_func(map_get, [<<"a">>, #{<<"a">> => 1}])), ?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])). - t_mput(_) -> +t_mput(_) -> ?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])), ?assertEqual(#{<<"a">> => 2}, apply_func(map_put, [<<"a">>, 2, #{<<"a">> => 1}])), ?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])). +t_bitsize(_) -> + ?assertEqual(8, apply_func(bitsize, [<<"a">>])), + ?assertEqual(4, apply_func(bitsize, [<<15:4>>])). + +t_subbits(_) -> + ?assertEqual(1, apply_func(subbits, [<<255:8>>, 1])), + ?assertEqual(3, apply_func(subbits, [<<255:8>>, 2])), + ?assertEqual(7, apply_func(subbits, [<<255:8>>, 3])), + ?assertEqual(15, apply_func(subbits, [<<255:8>>, 4])), + ?assertEqual(31, apply_func(subbits, [<<255:8>>, 5])), + ?assertEqual(63, apply_func(subbits, [<<255:8>>, 6])), + ?assertEqual(127, apply_func(subbits, [<<255:8>>, 7])), + ?assertEqual(255, apply_func(subbits, [<<255:8>>, 8])). + +t_subbits2(_) -> + ?assertEqual(1, apply_func(subbits, [<<255:8>>, 1, 1])), + ?assertEqual(3, apply_func(subbits, [<<255:8>>, 1, 2])), + ?assertEqual(7, apply_func(subbits, [<<255:8>>, 1, 3])), + ?assertEqual(15, apply_func(subbits, [<<255:8>>, 1, 4])), + ?assertEqual(31, apply_func(subbits, [<<255:8>>, 1, 5])), + ?assertEqual(63, apply_func(subbits, [<<255:8>>, 1, 6])), + ?assertEqual(127, apply_func(subbits, [<<255:8>>, 1, 7])), + ?assertEqual(255, apply_func(subbits, [<<255:8>>, 1, 8])). + +t_subbits2_1(_) -> + ?assertEqual(1, apply_func(subbits, [<<255:8>>, 2, 1])), + ?assertEqual(3, apply_func(subbits, [<<255:8>>, 2, 2])), + ?assertEqual(7, apply_func(subbits, [<<255:8>>, 2, 3])), + ?assertEqual(15, apply_func(subbits, [<<255:8>>, 2, 4])), + ?assertEqual(31, apply_func(subbits, [<<255:8>>, 2, 5])), + ?assertEqual(63, apply_func(subbits, [<<255:8>>, 2, 6])), + ?assertEqual(127, apply_func(subbits, [<<255:8>>, 2, 7])), + ?assertEqual(127, apply_func(subbits, [<<255:8>>, 2, 8])). + +t_subbits2_integer(_) -> + ?assertEqual(456, apply_func(subbits, [<<456:32/integer>>, 1, 32, <<"integer">>, <<"signed">>, <<"big">>])), + ?assertEqual(-456, apply_func(subbits, [<<-456:32/integer>>, 1, 32, <<"integer">>, <<"signed">>, <<"big">>])). + +t_subbits2_float(_) -> + R = apply_func(subbits, [<<5.3:64/float>>, 1, 64, <<"float">>, <<"unsigned">>, <<"big">>]), + RL = (5.3 - R), + ct:pal(";;;;~p", [R]), + ?assert( (RL >= 0 andalso RL < 0.0001) orelse (RL =< 0 andalso RL > -0.0001)), + + R2 = apply_func(subbits, [<<-5.3:64/float>>, 1, 64, <<"float">>, <<"signed">>, <<"big">>]), + + RL2 = (5.3 + R2), + ct:pal(";;;;~p", [R2]), + ?assert( (RL2 >= 0 andalso RL2 < 0.0001) orelse (RL2 =< 0 andalso RL2 > -0.0001)). %%------------------------------------------------------------------------------ %% Test cases for Hash funcs diff --git a/rebar.config b/rebar.config index e37a56721..acaa17a35 100644 --- a/rebar.config +++ b/rebar.config @@ -14,6 +14,7 @@ {overrides,[{add,[{erl_opts,[no_debug_info,compressed,deterministic, {parse_transform,mod_vsn}]}]}]}. + {xref_checks,[undefined_function_calls,undefined_functions,locals_not_used, deprecated_function_calls,warnings_as_errors, deprecated_functions]}. @@ -37,15 +38,14 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.1"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.2"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}} - , {erlport, {git, "https://github.com/emqx/erlport", {tag, "v1.2.2"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} , {getopt, "1.0.1"} ]}. diff --git a/rebar.config.erl b/rebar.config.erl index 5bfcfb743..01425810a 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -28,7 +28,6 @@ plugins() -> test_deps() -> [ {bbmustache, "1.10.0"} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}} , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}} , meck ].