feat: emqx_resource support cluster_call
This commit is contained in:
parent
4528508620
commit
73238ed81f
|
@ -89,7 +89,7 @@ headers(_) -> undefined.
|
|||
headers_no_content_type(type) -> map();
|
||||
headers_no_content_type(converter) ->
|
||||
fun(Headers) ->
|
||||
maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
|
||||
maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
|
||||
end;
|
||||
headers_no_content_type(default) -> default_headers_no_content_type();
|
||||
headers_no_content_type(_) -> undefined.
|
||||
|
@ -129,9 +129,9 @@ create(#{ method := Method
|
|||
emqx_connector_http,
|
||||
Config#{base_url => maps:remove(query, URIMap),
|
||||
pool_type => random}) of
|
||||
{ok, _} ->
|
||||
{ok, already_created} ->
|
||||
{ok, State};
|
||||
{error, already_created} ->
|
||||
{ok, _} ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
|
@ -296,4 +296,4 @@ parse_body(<<"application/json">>, Body) ->
|
|||
parse_body(<<"application/x-www-form-urlencoded">>, Body) ->
|
||||
{ok, maps:from_list(cow_qs:parse_qs(Body))};
|
||||
parse_body(ContentType, _) ->
|
||||
{error, {unsupported_content_type, ContentType}}.
|
||||
{error, {unsupported_content_type, ContentType}}.
|
||||
|
|
|
@ -106,9 +106,9 @@ create(#{ selector := Selector
|
|||
, '_unique'], Config),
|
||||
NState = State#{selector => NSelector},
|
||||
case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of
|
||||
{ok, _} ->
|
||||
{ok, already_created} ->
|
||||
{ok, NState};
|
||||
{error, already_created} ->
|
||||
{ok, _} ->
|
||||
{ok, NState};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
|
|
|
@ -83,9 +83,9 @@ create(#{ password_hash_algorithm := Algorithm
|
|||
query_timeout => QueryTimeout,
|
||||
'_unique' => Unique},
|
||||
case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of
|
||||
{ok, _} ->
|
||||
{ok, already_created} ->
|
||||
{ok, State};
|
||||
{error, already_created} ->
|
||||
{ok, _} ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
|
@ -131,7 +131,7 @@ authenticate(#{password := Password} = Credential,
|
|||
destroy(#{'_unique' := Unique}) ->
|
||||
_ = emqx_resource:remove_local(Unique),
|
||||
ok.
|
||||
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -71,9 +71,9 @@ create(#{ query := Query0
|
|||
salt_position => SaltPosition,
|
||||
'_unique' => Unique},
|
||||
case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of
|
||||
{ok, _} ->
|
||||
{ok, already_created} ->
|
||||
{ok, State};
|
||||
{error, already_created} ->
|
||||
{ok, _} ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
|
@ -119,7 +119,7 @@ authenticate(#{password := Password} = Credential,
|
|||
destroy(#{'_unique' := Unique}) ->
|
||||
_ = emqx_resource:remove_local(Unique),
|
||||
ok.
|
||||
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -89,9 +89,9 @@ create(#{ query := Query
|
|||
, '_unique'], Config),
|
||||
NState = State#{query => NQuery},
|
||||
case emqx_resource:create_local(Unique, emqx_connector_redis, Config) of
|
||||
{ok, _} ->
|
||||
{ok, already_created} ->
|
||||
{ok, NState};
|
||||
{error, already_created} ->
|
||||
{ok, _} ->
|
||||
{ok, NState};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
|
@ -176,7 +176,7 @@ check_fields(["superuser" | More], HasPassHash) ->
|
|||
check_fields(More, HasPassHash);
|
||||
check_fields([Field | _], _) ->
|
||||
error({unsupported_field, Field}).
|
||||
|
||||
|
||||
parse_key(Key) ->
|
||||
Tokens = re:split(Key, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]),
|
||||
parse_key(Tokens, []).
|
||||
|
|
|
@ -216,8 +216,8 @@ create_resource(#{type := DB,
|
|||
Config,
|
||||
[])
|
||||
of
|
||||
{ok, already_created} -> ResourceID;
|
||||
{ok, _} -> ResourceID;
|
||||
{error, already_created} -> ResourceID;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
create_resource(#{type := DB,
|
||||
|
@ -228,8 +228,8 @@ create_resource(#{type := DB,
|
|||
list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
|
||||
Config)
|
||||
of
|
||||
{ok, already_created} -> ResourceID;
|
||||
{ok, _} -> ResourceID;
|
||||
{error, already_created} -> ResourceID;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
|
|
|
@ -77,10 +77,10 @@ create_bridge(#{name := Name}, Params) ->
|
|||
case emqx_resource:check_and_create(
|
||||
emqx_data_bridge:name_to_resource_id(Name),
|
||||
emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of
|
||||
{ok, already_created} ->
|
||||
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
|
||||
{ok, Data} ->
|
||||
update_config_and_reply(Name, BridgeType, Config, Data);
|
||||
{error, already_created} ->
|
||||
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
|
||||
{error, Reason0} ->
|
||||
Reason = emqx_resource_api:stringnify(Reason0),
|
||||
{500, #{code => 102, message => <<"create bridge ", Name/binary,
|
||||
|
|
|
@ -73,8 +73,8 @@ load_bridge(#{name := Name, type := Type, config := Config}) ->
|
|||
case emqx_resource:create_local(
|
||||
emqx_data_bridge:name_to_resource_id(Name),
|
||||
emqx_data_bridge:resource_type(Type), Config) of
|
||||
{ok, already_created} -> ok;
|
||||
{ok, _} -> ok;
|
||||
{error, already_created} -> ok;
|
||||
{error, Reason} ->
|
||||
error({load_bridge, Reason})
|
||||
end.
|
||||
|
|
|
@ -130,8 +130,8 @@ handle_call(reset, _From, State) ->
|
|||
|
||||
handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
|
||||
case transaction(fun init_mfa/2, [Node, MFA]) of
|
||||
{atomic, {ok, TnxId}} ->
|
||||
{reply, {ok, TnxId}, State, {continue, ?CATCH_UP}};
|
||||
{atomic, {ok, TnxId, Result}} ->
|
||||
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
|
||||
{aborted, Reason} ->
|
||||
{reply, {error, Reason}, State, {continue, ?CATCH_UP}}
|
||||
end;
|
||||
|
@ -159,8 +159,9 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) ->
|
|||
case transaction(fun get_next_mfa/1, [Node]) of
|
||||
{atomic, caught_up} -> ?TIMEOUT;
|
||||
{atomic, {still_lagging, NextId, MFA}} ->
|
||||
case apply_mfa(NextId, MFA) of
|
||||
ok ->
|
||||
{Succeed, _} = apply_mfa(NextId, MFA),
|
||||
case Succeed of
|
||||
true ->
|
||||
case transaction(fun commit/2, [Node, NextId]) of
|
||||
{atomic, ok} -> catch_up(State);
|
||||
Error ->
|
||||
|
@ -171,7 +172,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State) ->
|
|||
error => Error}),
|
||||
RetryMs
|
||||
end;
|
||||
_Error -> RetryMs
|
||||
false -> RetryMs
|
||||
end;
|
||||
{aborted, Reason} ->
|
||||
?SLOG(error, #{
|
||||
|
@ -209,9 +210,8 @@ do_catch_up(ToTnxId, Node) ->
|
|||
CurTnxId = LastAppliedId + 1,
|
||||
[#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId),
|
||||
case apply_mfa(CurTnxId, MFA) of
|
||||
ok -> ok = commit(Node, CurTnxId);
|
||||
{error, Reason} -> mnesia:abort(Reason);
|
||||
Other -> mnesia:abort(Other)
|
||||
{true, _Result} -> ok = commit(Node, CurTnxId);
|
||||
{false, Error} -> mnesia:abort(Error)
|
||||
end;
|
||||
[#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
|
||||
Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
|
||||
|
@ -243,9 +243,8 @@ init_mfa(Node, MFA) ->
|
|||
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
|
||||
ok = commit(Node, TnxId),
|
||||
case apply_mfa(TnxId, MFA) of
|
||||
ok -> {ok, TnxId};
|
||||
{error, Reason} -> mnesia:abort(Reason);
|
||||
Other -> mnesia:abort(Other)
|
||||
{true, Result} -> {ok, TnxId, Result};
|
||||
{false, Error} -> mnesia:abort(Error)
|
||||
end.
|
||||
|
||||
do_catch_up_in_one_trans(LatestId, Node) ->
|
||||
|
@ -284,15 +283,21 @@ trans_query(TnxId) ->
|
|||
apply_mfa(TnxId, {M, F, A} = MFA) ->
|
||||
try
|
||||
Res = erlang:apply(M, F, A),
|
||||
case Res =:= ok of
|
||||
true ->
|
||||
?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => ok});
|
||||
false ->
|
||||
?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res})
|
||||
Succeed =
|
||||
case Res of
|
||||
ok ->
|
||||
?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
|
||||
true;
|
||||
{ok, _} ->
|
||||
?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
|
||||
true;
|
||||
_ ->
|
||||
?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}),
|
||||
false
|
||||
end,
|
||||
Res
|
||||
{Succeed, Res}
|
||||
catch
|
||||
C : E ->
|
||||
?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}),
|
||||
{error, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))}
|
||||
{false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))}
|
||||
end.
|
||||
|
|
|
@ -72,7 +72,7 @@ t_base_test(_Config) ->
|
|||
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
|
||||
Pid = self(),
|
||||
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
|
||||
{ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
|
||||
?assertEqual(MFA, maps:get(mfa, Query)),
|
||||
?assertEqual(node(), maps:get(initiator, Query)),
|
||||
|
@ -105,7 +105,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
|
|||
emqx_cluster_rpc:reset(),
|
||||
{atomic, []} = emqx_cluster_rpc:status(),
|
||||
MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
|
||||
{ok, _} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{atomic, [Status]} = emqx_cluster_rpc:status(),
|
||||
?assertEqual(MFA, maps:get(mfa, Status)),
|
||||
?assertEqual(node(), maps:get(node, Status)),
|
||||
|
@ -118,7 +118,7 @@ t_catch_up_status_handle_next_commit(_Config) ->
|
|||
emqx_cluster_rpc:reset(),
|
||||
{atomic, []} = emqx_cluster_rpc:status(),
|
||||
{M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]},
|
||||
{ok, _} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}),
|
||||
ok.
|
||||
|
||||
|
@ -127,21 +127,21 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
|
|||
{atomic, []} = emqx_cluster_rpc:status(),
|
||||
Now = erlang:system_time(second),
|
||||
{M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]},
|
||||
{ok, _} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]),
|
||||
{atomic, [Status|L]} = emqx_cluster_rpc:status(),
|
||||
?assertEqual([], L),
|
||||
?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
|
||||
?assertEqual(node(), maps:get(node, Status)),
|
||||
sleep(4000),
|
||||
sleep(3000),
|
||||
{atomic, [Status1]} = emqx_cluster_rpc:status(),
|
||||
?assertEqual(Status, Status1),
|
||||
sleep(1600),
|
||||
sleep(2600),
|
||||
{atomic, NewStatus} = emqx_cluster_rpc:status(),
|
||||
?assertEqual(3, length(NewStatus)),
|
||||
Pid = self(),
|
||||
MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
|
||||
{ok, TnxId} = emqx_cluster_rpc:multicall(M1, F1, A1),
|
||||
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1),
|
||||
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
|
||||
?assertEqual(MFAEcho, maps:get(mfa, Query)),
|
||||
?assertEqual(node(), maps:get(initiator, Query)),
|
||||
|
@ -157,12 +157,12 @@ t_del_stale_mfa(_Config) ->
|
|||
Keys2 = lists:seq(51, 150),
|
||||
Ids =
|
||||
[begin
|
||||
{ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
TnxId end || _ <- Keys],
|
||||
?assertEqual(Keys, Ids),
|
||||
Ids2 =
|
||||
[begin
|
||||
{ok, TnxId} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
{ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A),
|
||||
TnxId end || _ <- Keys2],
|
||||
?assertEqual(Keys2, Ids2),
|
||||
sleep(1200),
|
||||
|
|
|
@ -13,32 +13,6 @@
|
|||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
|
||||
|
||||
-define(CLUSTER_CALL(Func, Args, ResParttern),
|
||||
%% ekka_mnesia:running_nodes()
|
||||
fun() ->
|
||||
case LocalResult = erlang:apply(?MODULE, Func, Args) of
|
||||
ResParttern ->
|
||||
case rpc:multicall(nodes(), ?MODULE, Func, Args, 5000) of
|
||||
{ResL, []} ->
|
||||
Filter = fun
|
||||
(ResParttern) -> false;
|
||||
({badrpc, {'EXIT', {undef, [{?MODULE, Func0, _, []}]}}})
|
||||
when Func0 =:= Func -> false;
|
||||
(_) -> true
|
||||
end,
|
||||
case lists:filter(Filter, ResL) of
|
||||
[] -> LocalResult;
|
||||
ErrL -> {error, ErrL}
|
||||
end;
|
||||
{ResL, BadNodes} ->
|
||||
{error, {failed_on_nodes, BadNodes, ResL}}
|
||||
end;
|
||||
ErrorResult ->
|
||||
{error, ErrorResult}
|
||||
end
|
||||
end()).
|
||||
|
||||
-define(SAFE_CALL(_EXP_),
|
||||
?SAFE_CALL(_EXP_, _ = do_nothing)).
|
||||
|
@ -50,4 +24,4 @@
|
|||
_EXP_ON_FAIL_,
|
||||
{error, {_EXCLASS_, _EXCPTION_, _ST_}}
|
||||
end
|
||||
end()).
|
||||
end()).
|
||||
|
|
|
@ -157,7 +157,7 @@ query_failed({_, {OnFailed, Args}}) ->
|
|||
-spec create(instance_id(), resource_type(), resource_config()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
create(InstId, ResourceType, Config) ->
|
||||
?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}).
|
||||
cluster_call(create_local, [InstId, ResourceType, Config]).
|
||||
|
||||
-spec create_local(instance_id(), resource_type(), resource_config()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
|
@ -167,7 +167,7 @@ create_local(InstId, ResourceType, Config) ->
|
|||
-spec create_dry_run(instance_id(), resource_type(), resource_config()) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
create_dry_run(InstId, ResourceType, Config) ->
|
||||
?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]).
|
||||
cluster_call(create_dry_run_local, [InstId, ResourceType, Config]).
|
||||
|
||||
-spec create_dry_run_local(instance_id(), resource_type(), resource_config()) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
|
@ -177,7 +177,7 @@ create_dry_run_local(InstId, ResourceType, Config) ->
|
|||
-spec update(instance_id(), resource_type(), resource_config(), term()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
update(InstId, ResourceType, Config, Params) ->
|
||||
?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}).
|
||||
cluster_call(update_local, [InstId, ResourceType, Config, Params]).
|
||||
|
||||
-spec update_local(instance_id(), resource_type(), resource_config(), term()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
|
@ -186,7 +186,7 @@ update_local(InstId, ResourceType, Config, Params) ->
|
|||
|
||||
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
|
||||
remove(InstId) ->
|
||||
?CLUSTER_CALL(remove_local, [InstId]).
|
||||
cluster_call(remove_local, [InstId]).
|
||||
|
||||
-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
|
||||
remove_local(InstId) ->
|
||||
|
@ -335,3 +335,9 @@ safe_apply(Func, Args) ->
|
|||
|
||||
str(S) when is_binary(S) -> binary_to_list(S);
|
||||
str(S) when is_list(S) -> S.
|
||||
|
||||
cluster_call(Func, Args) ->
|
||||
case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of
|
||||
{ok, _TxnId, Result} -> Result;
|
||||
Failed -> Failed
|
||||
end.
|
||||
|
|
|
@ -162,7 +162,7 @@ do_update(InstId, ResourceType, NewConfig, Params) ->
|
|||
|
||||
do_create(InstId, ResourceType, Config) ->
|
||||
case lookup(InstId) of
|
||||
{ok, _} -> {error, already_created};
|
||||
{ok, _} -> {ok, already_created};
|
||||
_ ->
|
||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState} ->
|
||||
|
|
|
@ -443,9 +443,9 @@ create_resource(Context, #{type := DB} = Config) ->
|
|||
ResourceID,
|
||||
list_to_existing_atom(io_lib:format("~s_~s", [emqx_connector, DB])),
|
||||
Config) of
|
||||
{ok, _} ->
|
||||
{ok, already_created} ->
|
||||
Context#{resource_id => ResourceID};
|
||||
{error, already_created} ->
|
||||
{ok, _} ->
|
||||
Context#{resource_id => ResourceID};
|
||||
{error, Reason} ->
|
||||
error({load_config_error, Reason})
|
||||
|
|
|
@ -155,22 +155,6 @@
|
|||
end
|
||||
end()).
|
||||
|
||||
-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
|
||||
|
||||
-define(CLUSTER_CALL(Func, Args, ResParttern),
|
||||
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of
|
||||
{ResL, []} ->
|
||||
case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
|
||||
[] -> ResL;
|
||||
ErrL ->
|
||||
?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
|
||||
throw({Func, ErrL})
|
||||
end;
|
||||
{ResL, BadNodes} ->
|
||||
?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
|
||||
throw({Func, {failed_on_nodes, BadNodes}})
|
||||
end end()).
|
||||
|
||||
%% Tables
|
||||
-define(RULE_TAB, emqx_rule).
|
||||
-define(ACTION_TAB, emqx_rule_action).
|
||||
|
|
|
@ -216,7 +216,7 @@ delete_rule(RuleId) ->
|
|||
case emqx_rule_registry:get_rule(RuleId) of
|
||||
{ok, Rule = #rule{actions = Actions}} ->
|
||||
try
|
||||
_ = ?CLUSTER_CALL(clear_rule, [Rule]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, clear_rule, [Rule]),
|
||||
ok = emqx_rule_registry:remove_rule(Rule)
|
||||
catch
|
||||
Error:Reason:ST ->
|
||||
|
@ -242,7 +242,7 @@ create_resource(#{type := Type, config := Config0} = Params) ->
|
|||
ok = emqx_rule_registry:add_resource(Resource),
|
||||
%% Note that we will return OK in case of resource creation failure,
|
||||
%% A timer is started to re-start the resource later.
|
||||
catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
|
||||
catch _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]),
|
||||
{ok, Resource};
|
||||
not_found ->
|
||||
{error, {resource_type_not_found, Type}}
|
||||
|
@ -280,7 +280,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip
|
|||
Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
|
||||
case test_resource(#{type => Type, config => NewConfig}) of
|
||||
ok ->
|
||||
_ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]),
|
||||
emqx_rule_registry:add_resource(#resource{
|
||||
id = Id,
|
||||
type = Type,
|
||||
|
@ -319,8 +319,8 @@ test_resource(#{type := Type, config := Config0}) ->
|
|||
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
|
||||
ResId = resource_id(),
|
||||
try
|
||||
_ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]),
|
||||
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
|
||||
ok
|
||||
catch
|
||||
throw:Reason -> {error, Reason}
|
||||
|
@ -359,7 +359,7 @@ delete_resource(ResId) ->
|
|||
try
|
||||
case emqx_rule_registry:remove_resource(ResId) of
|
||||
ok ->
|
||||
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
|
||||
ok;
|
||||
{error, _} = R -> R
|
||||
end
|
||||
|
@ -426,7 +426,7 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) ->
|
|||
ActionInstId = maps:get(id, Action, action_instance_id(Name)),
|
||||
case NeedInit of
|
||||
true ->
|
||||
_ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId,
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId,
|
||||
with_resource_params(Args)]),
|
||||
ok;
|
||||
false -> ok
|
||||
|
@ -485,7 +485,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
|
|||
may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
|
||||
%% prepare new actions before removing old ones
|
||||
NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
|
||||
_ = ?CLUSTER_CALL(clear_actions, [OldActions]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, clear_actions, [OldActions]),
|
||||
may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
|
||||
may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
|
||||
Rule.
|
||||
|
@ -631,7 +631,7 @@ refresh_actions(Actions, Pred) ->
|
|||
true ->
|
||||
{ok, #action{module = Mod, on_create = Create}}
|
||||
= emqx_rule_registry:find_action(ActName),
|
||||
_ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]),
|
||||
refresh_actions(Fallbacks, Pred);
|
||||
false -> ok
|
||||
end
|
||||
|
|
|
@ -221,7 +221,7 @@ remove_rules(Rules) ->
|
|||
|
||||
%% @private
|
||||
insert_rule(Rule) ->
|
||||
_ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]),
|
||||
mnesia:write(?RULE_TAB, Rule, write).
|
||||
|
||||
%% @private
|
||||
|
@ -231,7 +231,7 @@ delete_rule(RuleId) when is_binary(RuleId) ->
|
|||
not_found -> ok
|
||||
end;
|
||||
delete_rule(Rule) ->
|
||||
_ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
|
||||
_ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]),
|
||||
mnesia:delete_object(?RULE_TAB, Rule, write).
|
||||
|
||||
load_hooks_for_rule(#rule{for = Topics}) ->
|
||||
|
@ -476,10 +476,11 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
get_all_records(Tab) ->
|
||||
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
||||
%% Wrapping ets to a r/o transaction to avoid reading inconsistent
|
||||
%% Wrapping ets to a transaction to avoid reading inconsistent
|
||||
%% ( nest cluster_call transaction, no a r/o transaction)
|
||||
%% data during shard bootstrap
|
||||
{atomic, Ret} =
|
||||
ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD,
|
||||
ekka_mnesia:transaction(?RULE_ENGINE_SHARD,
|
||||
fun() ->
|
||||
ets:tab2list(Tab)
|
||||
end),
|
||||
|
|
|
@ -55,6 +55,8 @@
|
|||
, can_topic_match_oneof/2
|
||||
]).
|
||||
|
||||
-export([cluster_call/3]).
|
||||
|
||||
-compile({no_auto_import,
|
||||
[ float/1
|
||||
]}).
|
||||
|
@ -356,3 +358,7 @@ can_topic_match_oneof(Topic, Filters) ->
|
|||
lists:any(fun(Fltr) ->
|
||||
emqx_topic:match(Topic, Fltr)
|
||||
end, Filters).
|
||||
|
||||
cluster_call(Module, Func, Args) ->
|
||||
{ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args),
|
||||
Result.
|
||||
|
|
|
@ -148,6 +148,7 @@ groups() ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx_machine),
|
||||
ok = ekka_mnesia:start(),
|
||||
ok = emqx_rule_registry:mnesia(boot),
|
||||
ok = emqx_ct_helpers:start_apps([emqx_rule_engine], fun set_special_configs/1),
|
||||
|
@ -181,6 +182,7 @@ end_per_group(_Groupname, _Config) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init_per_testcase(t_events, Config) ->
|
||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
init_events_counters(),
|
||||
ok = emqx_rule_registry:register_resource_types([make_simple_resource_type(simple_resource_type)]),
|
||||
|
@ -214,6 +216,7 @@ init_per_testcase(Test, Config)
|
|||
;Test =:= t_sqlselect_multi_actoins_3_1
|
||||
;Test =:= t_sqlselect_multi_actoins_4
|
||||
->
|
||||
emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
ok = emqx_rule_registry:add_action(
|
||||
#action{name = 'crash_action', app = ?APP,
|
||||
|
@ -252,6 +255,7 @@ init_per_testcase(Test, Config)
|
|||
{connsql, SQL}
|
||||
| Config];
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||
ok = emqx_rule_registry:register_resource_types(
|
||||
[#resource_type{
|
||||
name = built_in,
|
||||
|
|
|
@ -39,6 +39,7 @@ groups() ->
|
|||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx_machine),
|
||||
ok = ekka_mnesia:start(),
|
||||
ok = emqx_rule_registry:mnesia(boot),
|
||||
Config.
|
||||
|
@ -65,6 +66,7 @@ end_per_testcase(_, Config) ->
|
|||
|
||||
t_restart_resource(_) ->
|
||||
{ok, _} = emqx_rule_monitor:start_link(),
|
||||
emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc,1000),
|
||||
ok = emqx_rule_registry:register_resource_types(
|
||||
[#resource_type{
|
||||
name = test_res_1,
|
||||
|
|
Loading…
Reference in New Issue