diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 026df2415..43af1d9b4 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -89,7 +89,7 @@ headers(_) -> undefined. headers_no_content_type(type) -> map(); headers_no_content_type(converter) -> fun(Headers) -> - maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) + maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) end; headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(_) -> undefined. @@ -129,9 +129,9 @@ create(#{ method := Method emqx_connector_http, Config#{base_url => maps:remove(query, URIMap), pool_type => random}) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -296,4 +296,4 @@ parse_body(<<"application/json">>, Body) -> parse_body(<<"application/x-www-form-urlencoded">>, Body) -> {ok, maps:from_list(cow_qs:parse_qs(Body))}; parse_body(ContentType, _) -> - {error, {unsupported_content_type, ContentType}}. \ No newline at end of file + {error, {unsupported_content_type, ContentType}}. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index 56ced0104..1ce145f35 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -106,9 +106,9 @@ create(#{ selector := Selector , '_unique'], Config), NState = State#{selector => NSelector}, case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of - {ok, _} -> + {ok, already_created} -> {ok, NState}; - {error, already_created} -> + {ok, _} -> {ok, NState}; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 75a3392ec..59afa9671 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -83,9 +83,9 @@ create(#{ password_hash_algorithm := Algorithm query_timeout => QueryTimeout, '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -131,7 +131,7 @@ authenticate(#{password := Password} = Credential, destroy(#{'_unique' := Unique}) -> _ = emqx_resource:remove_local(Unique), ok. - + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 44c7f7185..cce9ebd6f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -71,9 +71,9 @@ create(#{ query := Query0 salt_position => SaltPosition, '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -119,7 +119,7 @@ authenticate(#{password := Password} = Credential, destroy(#{'_unique' := Unique}) -> _ = emqx_resource:remove_local(Unique), ok. - + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index 0c2696c0e..6eff345ed 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -89,9 +89,9 @@ create(#{ query := Query , '_unique'], Config), NState = State#{query => NQuery}, case emqx_resource:create_local(Unique, emqx_connector_redis, Config) of - {ok, _} -> + {ok, already_created} -> {ok, NState}; - {error, already_created} -> + {ok, _} -> {ok, NState}; {error, Reason} -> {error, Reason} @@ -176,7 +176,7 @@ check_fields(["superuser" | More], HasPassHash) -> check_fields(More, HasPassHash); check_fields([Field | _], _) -> error({unsupported_field, Field}). - + parse_key(Key) -> Tokens = re:split(Key, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]), parse_key(Tokens, []). diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index f158322e1..bbe4caa6b 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -216,8 +216,8 @@ create_resource(#{type := DB, Config, []) of + {ok, already_created} -> ResourceID; {ok, _} -> ResourceID; - {error, already_created} -> ResourceID; {error, Reason} -> {error, Reason} end; create_resource(#{type := DB, @@ -228,8 +228,8 @@ create_resource(#{type := DB, list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), Config) of + {ok, already_created} -> ResourceID; {ok, _} -> ResourceID; - {error, already_created} -> ResourceID; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl index dea3dcae8..6fe75e4ce 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl @@ -77,10 +77,10 @@ create_bridge(#{name := Name}, Params) -> case emqx_resource:check_and_create( emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of + {ok, already_created} -> + {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {ok, Data} -> update_config_and_reply(Name, BridgeType, Config, Data); - {error, already_created} -> - {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {error, Reason0} -> Reason = emqx_resource_api:stringnify(Reason0), {500, #{code => 102, message => <<"create bridge ", Name/binary, diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl index d408a8062..4917833ec 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl @@ -73,8 +73,8 @@ load_bridge(#{name := Name, type := Type, config := Config}) -> case emqx_resource:create_local( emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:resource_type(Type), Config) of + {ok, already_created} -> ok; {ok, _} -> ok; - {error, already_created} -> ok; {error, Reason} -> error({load_bridge, Reason}) end. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index f7dc1eef9..4c7576ff3 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -130,8 +130,8 @@ handle_call(reset, _From, State) -> handle_call({initiate, MFA}, _From, State = #{node := Node}) -> case transaction(fun init_mfa/2, [Node, MFA]) of - {atomic, {ok, TnxId}} -> - {reply, {ok, TnxId}, State, {continue, ?CATCH_UP}}; + {atomic, {ok, TnxId, Result}} -> + {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; {aborted, Reason} -> {reply, {error, Reason}, State, {continue, ?CATCH_UP}} end; @@ -159,8 +159,9 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) -> case transaction(fun get_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> - case apply_mfa(NextId, MFA) of - ok -> + {Succeed, _} = apply_mfa(NextId, MFA), + case Succeed of + true -> case transaction(fun commit/2, [Node, NextId]) of {atomic, ok} -> catch_up(State); Error -> @@ -171,7 +172,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) -> error => Error}), RetryMs end; - _Error -> RetryMs + false -> RetryMs end; {aborted, Reason} -> ?SLOG(error, #{ @@ -209,9 +210,8 @@ do_catch_up(ToTnxId, Node) -> CurTnxId = LastAppliedId + 1, [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId), case apply_mfa(CurTnxId, MFA) of - ok -> ok = commit(Node, CurTnxId); - {error, Reason} -> mnesia:abort(Reason); - Other -> mnesia:abort(Other) + {true, _Result} -> ok = commit(Node, CurTnxId); + {false, Error} -> mnesia:abort(Error) end; [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", @@ -243,9 +243,8 @@ init_mfa(Node, MFA) -> ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = commit(Node, TnxId), case apply_mfa(TnxId, MFA) of - ok -> {ok, TnxId}; - {error, Reason} -> mnesia:abort(Reason); - Other -> mnesia:abort(Other) + {true, Result} -> {ok, TnxId, Result}; + {false, Error} -> mnesia:abort(Error) end. do_catch_up_in_one_trans(LatestId, Node) -> @@ -284,15 +283,21 @@ trans_query(TnxId) -> apply_mfa(TnxId, {M, F, A} = MFA) -> try Res = erlang:apply(M, F, A), - case Res =:= ok of - true -> - ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok}); - false -> - ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}) + Succeed = + case Res of + ok -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + true; + {ok, _} -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + true; + _ -> + ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + false end, - Res + {Succeed, Res} catch C : E -> ?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}), - {error, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} + {false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} end. diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index b91131b93..26ad28f3e 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -72,7 +72,7 @@ t_base_test(_Config) -> ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), @@ -105,7 +105,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), {atomic, [Status]} = emqx_cluster_rpc:status(), ?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), @@ -118,7 +118,7 @@ t_catch_up_status_handle_next_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, - {ok, _} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), ok. @@ -127,21 +127,21 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), Now = erlang:system_time(second), {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, - {ok, _} = emqx_cluster_rpc:multicall(M, F, A), - {ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]), + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]), {atomic, [Status|L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), - sleep(4000), + sleep(3000), {atomic, [Status1]} = emqx_cluster_rpc:status(), ?assertEqual(Status, Status1), - sleep(1600), + sleep(2600), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId} = emqx_cluster_rpc:multicall(M1, F1, A1), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), @@ -157,12 +157,12 @@ t_del_stale_mfa(_Config) -> Keys2 = lists:seq(51, 150), Ids = [begin - {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), TnxId end || _ <- Keys], ?assertEqual(Keys, Ids), Ids2 = [begin - {ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), TnxId end || _ <- Keys2], ?assertEqual(Keys2, Ids2), sleep(1200), diff --git a/apps/emqx_resource/include/emqx_resource_utils.hrl b/apps/emqx_resource/include/emqx_resource_utils.hrl index a20a17e89..4c3a2a749 100644 --- a/apps/emqx_resource/include/emqx_resource_utils.hrl +++ b/apps/emqx_resource/include/emqx_resource_utils.hrl @@ -13,32 +13,6 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). - --define(CLUSTER_CALL(Func, Args, ResParttern), -%% ekka_mnesia:running_nodes() - fun() -> - case LocalResult = erlang:apply(?MODULE, Func, Args) of - ResParttern -> - case rpc:multicall(nodes(), ?MODULE, Func, Args, 5000) of - {ResL, []} -> - Filter = fun - (ResParttern) -> false; - ({badrpc, {'EXIT', {undef, [{?MODULE, Func0, _, []}]}}}) - when Func0 =:= Func -> false; - (_) -> true - end, - case lists:filter(Filter, ResL) of - [] -> LocalResult; - ErrL -> {error, ErrL} - end; - {ResL, BadNodes} -> - {error, {failed_on_nodes, BadNodes, ResL}} - end; - ErrorResult -> - {error, ErrorResult} - end - end()). -define(SAFE_CALL(_EXP_), ?SAFE_CALL(_EXP_, _ = do_nothing)). @@ -50,4 +24,4 @@ _EXP_ON_FAIL_, {error, {_EXCLASS_, _EXCPTION_, _ST_}} end - end()). \ No newline at end of file + end()). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1fce5e122..4bc1d20f0 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -157,7 +157,7 @@ query_failed({_, {OnFailed, Args}}) -> -spec create(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}). + cluster_call(create_local, [InstId, ResourceType, Config]). -spec create_local(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -167,7 +167,7 @@ create_local(InstId, ResourceType, Config) -> -spec create_dry_run(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]). + cluster_call(create_dry_run_local, [InstId, ResourceType, Config]). -spec create_dry_run_local(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -177,7 +177,7 @@ create_dry_run_local(InstId, ResourceType, Config) -> -spec update(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. update(InstId, ResourceType, Config, Params) -> - ?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}). + cluster_call(update_local, [InstId, ResourceType, Config, Params]). -spec update_local(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -186,7 +186,7 @@ update_local(InstId, ResourceType, Config, Params) -> -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - ?CLUSTER_CALL(remove_local, [InstId]). + cluster_call(remove_local, [InstId]). -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> @@ -335,3 +335,9 @@ safe_apply(Func, Args) -> str(S) when is_binary(S) -> binary_to_list(S); str(S) when is_list(S) -> S. + +cluster_call(Func, Args) -> + case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of + {ok, _TxnId, Result} -> Result; + Failed -> Failed + end. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 8e0624c75..84b5a1f7c 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -162,7 +162,7 @@ do_update(InstId, ResourceType, NewConfig, Params) -> do_create(InstId, ResourceType, Config) -> case lookup(InstId) of - {ok, _} -> {error, already_created}; + {ok, _} -> {ok, already_created}; _ -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 7a37abbee..3fab5958d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -443,9 +443,9 @@ create_resource(Context, #{type := DB} = Config) -> ResourceID, list_to_existing_atom(io_lib:format("~s_~s", [emqx_connector, DB])), Config) of - {ok, _} -> + {ok, already_created} -> Context#{resource_id => ResourceID}; - {error, already_created} -> + {ok, _} -> Context#{resource_id => ResourceID}; {error, Reason} -> error({load_config_error, Reason}) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 568724263..760495f6b 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -155,22 +155,6 @@ end end()). --define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). - --define(CLUSTER_CALL(Func, Args, ResParttern), - fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of - {ResL, []} -> - case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of - [] -> ResL; - ErrL -> - ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]), - throw({Func, ErrL}) - end; - {ResL, BadNodes} -> - ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]), - throw({Func, {failed_on_nodes, BadNodes}}) - end end()). - %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index c2ccf2c29..0b3ffb603 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -216,7 +216,7 @@ delete_rule(RuleId) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule = #rule{actions = Actions}} -> try - _ = ?CLUSTER_CALL(clear_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_rule, [Rule]), ok = emqx_rule_registry:remove_rule(Rule) catch Error:Reason:ST -> @@ -242,7 +242,7 @@ create_resource(#{type := Type, config := Config0} = Params) -> ok = emqx_rule_registry:add_resource(Resource), %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), + catch _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]), {ok, Resource}; not_found -> {error, {resource_type_not_found, Type}} @@ -280,7 +280,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), case test_resource(#{type => Type, config => NewConfig}) of ok -> - _ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]), emqx_rule_registry:add_resource(#resource{ id = Id, type = Type, @@ -319,8 +319,8 @@ test_resource(#{type := Type, config := Config0}) -> Config = emqx_rule_validator:validate_params(Config0, ParamSpec), ResId = resource_id(), try - _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok catch throw:Reason -> {error, Reason} @@ -359,7 +359,7 @@ delete_resource(ResId) -> try case emqx_rule_registry:remove_resource(ResId) of ok -> - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok; {error, _} = R -> R end @@ -426,7 +426,7 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) -> ActionInstId = maps:get(id, Action, action_instance_id(Name)), case NeedInit of true -> - _ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId, + _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]), ok; false -> ok @@ -485,7 +485,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) -> may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) -> %% prepare new actions before removing old ones NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)), - _ = ?CLUSTER_CALL(clear_actions, [OldActions]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_actions, [OldActions]), may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params)); may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params Rule. @@ -631,7 +631,7 @@ refresh_actions(Actions, Pred) -> true -> {ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName), - _ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]), refresh_actions(Fallbacks, Pred); false -> ok end diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index c0bd5de7b..096534585 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -221,7 +221,7 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> - _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -231,7 +231,7 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> - _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -476,10 +476,11 @@ code_change(_OldVsn, State, _Extra) -> get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). - %% Wrapping ets to a r/o transaction to avoid reading inconsistent + %% Wrapping ets to a transaction to avoid reading inconsistent + %% ( nest cluster_call transaction, no a r/o transaction) %% data during shard bootstrap {atomic, Ret} = - ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD, + ekka_mnesia:transaction(?RULE_ENGINE_SHARD, fun() -> ets:tab2list(Tab) end), diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 3791b1386..2d978ee0d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -55,6 +55,8 @@ , can_topic_match_oneof/2 ]). +-export([cluster_call/3]). + -compile({no_auto_import, [ float/1 ]}). @@ -356,3 +358,7 @@ can_topic_match_oneof(Topic, Filters) -> lists:any(fun(Fltr) -> emqx_topic:match(Topic, Fltr) end, Filters). + +cluster_call(Module, Func, Args) -> + {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args), + Result. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index bf4dcb30e..47eb4faad 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -148,6 +148,7 @@ groups() -> %%------------------------------------------------------------------------------ init_per_suite(Config) -> + application:load(emqx_machine), ok = ekka_mnesia:start(), ok = emqx_rule_registry:mnesia(boot), ok = emqx_ct_helpers:start_apps([emqx_rule_engine], fun set_special_configs/1), @@ -181,6 +182,7 @@ end_per_group(_Groupname, _Config) -> %%------------------------------------------------------------------------------ init_per_testcase(t_events, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_engine:load_providers(), init_events_counters(), ok = emqx_rule_registry:register_resource_types([make_simple_resource_type(simple_resource_type)]), @@ -214,6 +216,7 @@ init_per_testcase(Test, Config) ;Test =:= t_sqlselect_multi_actoins_3_1 ;Test =:= t_sqlselect_multi_actoins_4 -> + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_engine:load_providers(), ok = emqx_rule_registry:add_action( #action{name = 'crash_action', app = ?APP, @@ -252,6 +255,7 @@ init_per_testcase(Test, Config) {connsql, SQL} | Config]; init_per_testcase(_TestCase, Config) -> + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = built_in, diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 67c59e26c..62f538f43 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -39,6 +39,7 @@ groups() -> ]. init_per_suite(Config) -> + application:load(emqx_machine), ok = ekka_mnesia:start(), ok = emqx_rule_registry:mnesia(boot), Config. @@ -65,6 +66,7 @@ end_per_testcase(_, Config) -> t_restart_resource(_) -> {ok, _} = emqx_rule_monitor:start_link(), + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc,1000), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1,