Merge pull request #6597 from terry-xiaoyu/bridge_bug_fixes_2
Bridge bug fixes 2
This commit is contained in:
commit
7643564ef1
|
@ -153,9 +153,8 @@ t_destroy(_Config) ->
|
||||||
?GLOBAL),
|
?GLOBAL),
|
||||||
|
|
||||||
% Authenticator should not be usable anymore
|
% Authenticator should not be usable anymore
|
||||||
?assertException(
|
?assertMatch(
|
||||||
error,
|
ignore,
|
||||||
_,
|
|
||||||
emqx_authn_http:authenticate(
|
emqx_authn_http:authenticate(
|
||||||
Credentials,
|
Credentials,
|
||||||
State)).
|
State)).
|
||||||
|
|
|
@ -146,9 +146,8 @@ t_destroy(_Config) ->
|
||||||
?GLOBAL),
|
?GLOBAL),
|
||||||
|
|
||||||
% Authenticator should not be usable anymore
|
% Authenticator should not be usable anymore
|
||||||
?assertException(
|
?assertMatch(
|
||||||
error,
|
ignore,
|
||||||
_,
|
|
||||||
emqx_authn_mongodb:authenticate(
|
emqx_authn_mongodb:authenticate(
|
||||||
#{username => <<"plain">>,
|
#{username => <<"plain">>,
|
||||||
password => <<"plain">>
|
password => <<"plain">>
|
||||||
|
|
|
@ -159,9 +159,8 @@ t_destroy(_Config) ->
|
||||||
?GLOBAL),
|
?GLOBAL),
|
||||||
|
|
||||||
% Authenticator should not be usable anymore
|
% Authenticator should not be usable anymore
|
||||||
?assertException(
|
?assertMatch(
|
||||||
error,
|
ignore,
|
||||||
_,
|
|
||||||
emqx_authn_mysql:authenticate(
|
emqx_authn_mysql:authenticate(
|
||||||
#{username => <<"plain">>,
|
#{username => <<"plain">>,
|
||||||
password => <<"plain">>
|
password => <<"plain">>
|
||||||
|
|
|
@ -159,9 +159,8 @@ t_destroy(_Config) ->
|
||||||
?GLOBAL),
|
?GLOBAL),
|
||||||
|
|
||||||
% Authenticator should not be usable anymore
|
% Authenticator should not be usable anymore
|
||||||
?assertException(
|
?assertMatch(
|
||||||
error,
|
ignore,
|
||||||
_,
|
|
||||||
emqx_authn_pgsql:authenticate(
|
emqx_authn_pgsql:authenticate(
|
||||||
#{username => <<"plain">>,
|
#{username => <<"plain">>,
|
||||||
password => <<"plain">>
|
password => <<"plain">>
|
||||||
|
|
|
@ -164,9 +164,8 @@ t_destroy(_Config) ->
|
||||||
?GLOBAL),
|
?GLOBAL),
|
||||||
|
|
||||||
% Authenticator should not be usable anymore
|
% Authenticator should not be usable anymore
|
||||||
?assertException(
|
?assertMatch(
|
||||||
error,
|
ignore,
|
||||||
_,
|
|
||||||
emqx_authn_redis:authenticate(
|
emqx_authn_redis:authenticate(
|
||||||
#{username => <<"plain">>,
|
#{username => <<"plain">>,
|
||||||
password => <<"plain">>
|
password => <<"plain">>
|
||||||
|
|
|
@ -80,17 +80,36 @@ unload_hook() ->
|
||||||
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
||||||
case maps:get(sys, Flags, false) of
|
case maps:get(sys, Flags, false) of
|
||||||
false ->
|
false ->
|
||||||
lists:foreach(fun (Id) ->
|
Msg = emqx_rule_events:eventmsg_publish(Message),
|
||||||
send_message(Id, emqx_rule_events:eventmsg_publish(Message))
|
send_to_egress_matched_bridges(Topic, Msg);
|
||||||
end, get_matched_bridges(Topic));
|
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
|
send_to_egress_matched_bridges(Topic, Msg) ->
|
||||||
|
lists:foreach(fun (Id) ->
|
||||||
|
try send_message(Id, Msg) of
|
||||||
|
ok -> ok;
|
||||||
|
Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed",
|
||||||
|
bridge => Id, error => Error})
|
||||||
|
catch Err:Reason:ST ->
|
||||||
|
?SLOG(error, #{msg => "send_message_to_bridge_crash",
|
||||||
|
bridge => Id, error => Err, reason => Reason,
|
||||||
|
stacktrace => ST})
|
||||||
|
end
|
||||||
|
end, get_matched_bridges(Topic)).
|
||||||
|
|
||||||
send_message(BridgeId, Message) ->
|
send_message(BridgeId, Message) ->
|
||||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
||||||
emqx_resource:query(ResId, {send_message, Message}).
|
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
|
||||||
|
not_found ->
|
||||||
|
{error, {bridge_not_found, BridgeId}};
|
||||||
|
#{enable := true} ->
|
||||||
|
emqx_resource:query(ResId, {send_message, Message});
|
||||||
|
#{enable := false} ->
|
||||||
|
{error, {bridge_stopped, BridgeId}}
|
||||||
|
end.
|
||||||
|
|
||||||
config_key_path() ->
|
config_key_path() ->
|
||||||
[bridges].
|
[bridges].
|
||||||
|
@ -279,6 +298,8 @@ get_matched_bridges(Topic) ->
|
||||||
end, Acc0, Conf)
|
end, Acc0, Conf)
|
||||||
end, [], Bridges).
|
end, [], Bridges).
|
||||||
|
|
||||||
|
get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
|
||||||
|
Acc;
|
||||||
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
|
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
|
||||||
case emqx_topic:match(Topic, Filter) of
|
case emqx_topic:match(Topic, Filter) of
|
||||||
true -> [bridge_id(BType, BName) | Acc];
|
true -> [bridge_id(BType, BName) | Acc];
|
||||||
|
@ -309,21 +330,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
|
||||||
{Type, ConnName} ->
|
{Type, ConnName} ->
|
||||||
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
|
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
|
||||||
make_resource_confs(Direction, ConnectorConfs,
|
make_resource_confs(Direction, ConnectorConfs,
|
||||||
maps:without([connector, direction], Conf), Name);
|
maps:without([connector, direction], Conf), Type, Name);
|
||||||
{_ConnType, _ConnName} ->
|
{_ConnType, _ConnName} ->
|
||||||
error({cannot_use_connector_with_different_type, ConnId})
|
error({cannot_use_connector_with_different_type, ConnId})
|
||||||
end;
|
end;
|
||||||
parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
|
parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
|
||||||
when is_map(ConnectorConfs) ->
|
when is_map(ConnectorConfs) ->
|
||||||
make_resource_confs(Direction, ConnectorConfs,
|
make_resource_confs(Direction, ConnectorConfs,
|
||||||
maps:without([connector, direction], Conf), Name).
|
maps:without([connector, direction], Conf), Type, Name).
|
||||||
|
|
||||||
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) ->
|
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
|
||||||
BName = bin(Name),
|
BName = bridge_id(Type, Name),
|
||||||
ConnectorConfs#{
|
ConnectorConfs#{
|
||||||
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
|
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
|
||||||
};
|
};
|
||||||
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) ->
|
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
|
||||||
ConnectorConfs#{
|
ConnectorConfs#{
|
||||||
egress => BridgeConf
|
egress => BridgeConf
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -68,7 +68,6 @@ How long will the HTTP request timeout.
|
||||||
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[ type_field()
|
[ type_field()
|
||||||
, name_field()
|
|
||||||
] ++ fields("bridge");
|
] ++ fields("bridge");
|
||||||
|
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
|
@ -103,8 +102,5 @@ id_field() ->
|
||||||
type_field() ->
|
type_field() ->
|
||||||
{type, mk(http, #{desc => "The Bridge Type"})}.
|
{type, mk(http, #{desc => "The Bridge Type"})}.
|
||||||
|
|
||||||
name_field() ->
|
|
||||||
{name, mk(binary(), #{desc => "The Bridge Name"})}.
|
|
||||||
|
|
||||||
method() ->
|
method() ->
|
||||||
enum([post, put, get, delete]).
|
enum([post, put, get, delete]).
|
||||||
|
|
|
@ -24,11 +24,9 @@ fields("egress") ->
|
||||||
|
|
||||||
fields("post_ingress") ->
|
fields("post_ingress") ->
|
||||||
[ type_field()
|
[ type_field()
|
||||||
, name_field()
|
|
||||||
] ++ proplists:delete(enable, fields("ingress"));
|
] ++ proplists:delete(enable, fields("ingress"));
|
||||||
fields("post_egress") ->
|
fields("post_egress") ->
|
||||||
[ type_field()
|
[ type_field()
|
||||||
, name_field()
|
|
||||||
] ++ proplists:delete(enable, fields("egress"));
|
] ++ proplists:delete(enable, fields("egress"));
|
||||||
|
|
||||||
fields("put_ingress") ->
|
fields("put_ingress") ->
|
||||||
|
@ -49,9 +47,3 @@ id_field() ->
|
||||||
|
|
||||||
type_field() ->
|
type_field() ->
|
||||||
{type, mk(mqtt, #{desc => "The Bridge Type"})}.
|
{type, mk(mqtt, #{desc => "The Bridge Type"})}.
|
||||||
|
|
||||||
name_field() ->
|
|
||||||
{name, mk(binary(),
|
|
||||||
#{ desc => "The Bridge Name"
|
|
||||||
, example => "some_bridge_name"
|
|
||||||
})}.
|
|
||||||
|
|
|
@ -68,10 +68,6 @@ fields("put") ->
|
||||||
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[ {type, mk(mqtt, #{desc => "The Connector Type"})}
|
[ {type, mk(mqtt, #{desc => "The Connector Type"})}
|
||||||
, {name, mk(binary(),
|
|
||||||
#{ desc => "The Connector Name"
|
|
||||||
, example => <<"my_mqtt_connector">>
|
|
||||||
})}
|
|
||||||
] ++ fields("put").
|
] ++ fields("put").
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
|
@ -173,15 +173,27 @@ handle_publish(Msg, Vars) ->
|
||||||
_ = erlang:apply(Mod, Func, [Msg | Args]);
|
_ = erlang:apply(Mod, Func, [Msg | Args]);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
case maps:get(local_topic, Vars, undefined) of
|
maybe_publish_to_local_broker(Msg, Vars).
|
||||||
undefined -> ok;
|
|
||||||
_Topic ->
|
|
||||||
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
|
|
||||||
end.
|
|
||||||
|
|
||||||
handle_disconnected(Reason, Parent) ->
|
handle_disconnected(Reason, Parent) ->
|
||||||
Parent ! {disconnected, self(), Reason}.
|
Parent ! {disconnected, self(), Reason}.
|
||||||
|
|
||||||
|
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
|
||||||
|
case maps:get(local_topic, Vars, undefined) of
|
||||||
|
undefined ->
|
||||||
|
%% local topic is not set, discard it
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
case emqx_topic:match(Topic, SubTopic) of
|
||||||
|
true ->
|
||||||
|
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
|
||||||
|
message => Msg, subscribed => SubTopic, got_topic => Topic})
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
make_hdlr(Parent, Vars) ->
|
make_hdlr(Parent, Vars) ->
|
||||||
#{puback => {fun ?MODULE:handle_puback/2, [Parent]},
|
#{puback => {fun ?MODULE:handle_puback/2, [Parent]},
|
||||||
publish => {fun ?MODULE:handle_publish/2, [Vars]},
|
publish => {fun ?MODULE:handle_publish/2, [Vars]},
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
mod := module(),
|
mod := module(),
|
||||||
config := resource_config(),
|
config := resource_config(),
|
||||||
state := resource_state(),
|
state := resource_state(),
|
||||||
status := started | stopped,
|
status := started | stopped | starting,
|
||||||
metrics := emqx_plugin_libs_metrics:metrics()
|
metrics := emqx_plugin_libs_metrics:metrics()
|
||||||
}.
|
}.
|
||||||
-type resource_group() :: binary().
|
-type resource_group() :: binary().
|
||||||
|
@ -41,3 +41,5 @@
|
||||||
%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
|
%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
|
||||||
%% actions upon query failure
|
%% actions upon query failure
|
||||||
-type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
|
-type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
|
||||||
|
|
||||||
|
-define(TEST_ID_PREFIX, "_test_:").
|
||||||
|
|
|
@ -82,7 +82,6 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
|
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
|
||||||
|
|
||||||
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
||||||
|
|
||||||
-optional_callbacks([ on_query/4
|
-optional_callbacks([ on_query/4
|
||||||
|
@ -170,7 +169,7 @@ create_dry_run(ResourceType, Config) ->
|
||||||
-spec create_dry_run_local(resource_type(), resource_config()) ->
|
-spec create_dry_run_local(resource_type(), resource_config()) ->
|
||||||
ok | {error, Reason :: term()}.
|
ok | {error, Reason :: term()}.
|
||||||
create_dry_run_local(ResourceType, Config) ->
|
create_dry_run_local(ResourceType, Config) ->
|
||||||
InstId = iolist_to_binary(emqx_misc:gen_id(16)),
|
InstId = emqx_resource_instance:make_test_id(),
|
||||||
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
|
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
|
||||||
|
|
||||||
-spec recreate(instance_id(), resource_type(), resource_config(), term()) ->
|
-spec recreate(instance_id(), resource_type(), resource_config(), term()) ->
|
||||||
|
@ -201,14 +200,18 @@ query(InstId, Request) ->
|
||||||
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
|
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
|
||||||
query(InstId, Request, AfterQuery) ->
|
query(InstId, Request, AfterQuery) ->
|
||||||
case get_instance(InstId) of
|
case get_instance(InstId) of
|
||||||
|
{ok, #{status := starting}} ->
|
||||||
|
query_error(starting, <<"cannot serve query when the resource "
|
||||||
|
"instance is still starting">>);
|
||||||
{ok, #{status := stopped}} ->
|
{ok, #{status := stopped}} ->
|
||||||
error({resource_stopped, InstId});
|
query_error(stopped, <<"cannot serve query when the resource "
|
||||||
|
"instance is stopped">>);
|
||||||
{ok, #{mod := Mod, state := ResourceState, status := started}} ->
|
{ok, #{mod := Mod, state := ResourceState, status := started}} ->
|
||||||
%% the resource state is readonly to Module:on_query/4
|
%% the resource state is readonly to Module:on_query/4
|
||||||
%% and the `after_query()` functions should be thread safe
|
%% and the `after_query()` functions should be thread safe
|
||||||
Mod:on_query(InstId, Request, AfterQuery, ResourceState);
|
Mod:on_query(InstId, Request, AfterQuery, ResourceState);
|
||||||
{error, Reason} ->
|
{error, not_found} ->
|
||||||
error({get_instance, {InstId, Reason}})
|
query_error(not_found, <<"the resource id not exists">>)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
|
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
|
||||||
|
@ -368,3 +371,6 @@ cluster_call(Func, Args) ->
|
||||||
{ok, _TxnId, Result} -> Result;
|
{ok, _TxnId, Result} -> Result;
|
||||||
Failed -> Failed
|
Failed -> Failed
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
query_error(Reason, Msg) ->
|
||||||
|
{error, {?MODULE, #{reason => Reason, msg => Msg}}}.
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
-export([ lookup/1
|
-export([ lookup/1
|
||||||
, get_metrics/1
|
, get_metrics/1
|
||||||
, list_all/0
|
, list_all/0
|
||||||
|
, make_test_id/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ hash_call/2
|
-export([ hash_call/2
|
||||||
|
@ -61,7 +62,7 @@ hash_call(InstId, Request) ->
|
||||||
hash_call(InstId, Request, Timeout) ->
|
hash_call(InstId, Request, Timeout) ->
|
||||||
gen_server:call(pick(InstId), Request, Timeout).
|
gen_server:call(pick(InstId), Request, Timeout).
|
||||||
|
|
||||||
-spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
|
-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}.
|
||||||
lookup(InstId) ->
|
lookup(InstId) ->
|
||||||
case ets:lookup(emqx_resource_instance, InstId) of
|
case ets:lookup(emqx_resource_instance, InstId) of
|
||||||
[] -> {error, not_found};
|
[] -> {error, not_found};
|
||||||
|
@ -69,6 +70,10 @@ lookup(InstId) ->
|
||||||
{ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
|
{ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
make_test_id() ->
|
||||||
|
RandId = iolist_to_binary(emqx_misc:gen_id(16)),
|
||||||
|
<<?TEST_ID_PREFIX, RandId/binary>>.
|
||||||
|
|
||||||
get_metrics(InstId) ->
|
get_metrics(InstId) ->
|
||||||
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
|
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
|
||||||
|
|
||||||
|
@ -146,7 +151,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
|
||||||
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
|
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
|
||||||
Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
|
Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
|
||||||
NewConfig, Params),
|
NewConfig, Params),
|
||||||
TestInstId = iolist_to_binary(emqx_misc:gen_id(16)),
|
TestInstId = make_test_id(),
|
||||||
case do_create_dry_run(TestInstId, ResourceType, Config) of
|
case do_create_dry_run(TestInstId, ResourceType, Config) of
|
||||||
ok ->
|
ok ->
|
||||||
do_remove(ResourceType, InstId, ResourceState, false),
|
do_remove(ResourceType, InstId, ResourceState, false),
|
||||||
|
@ -166,7 +171,9 @@ do_create(InstId, ResourceType, Config, Opts) ->
|
||||||
{ok, _} -> {ok, already_created};
|
{ok, _} -> {ok, already_created};
|
||||||
_ ->
|
_ ->
|
||||||
Res0 = #{id => InstId, mod => ResourceType, config => Config,
|
Res0 = #{id => InstId, mod => ResourceType, config => Config,
|
||||||
status => stopped, state => undefined},
|
status => starting, state => undefined},
|
||||||
|
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
||||||
|
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
||||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||||
{ok, ResourceState} ->
|
{ok, ResourceState} ->
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
|
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
|
||||||
|
@ -181,6 +188,7 @@ do_create(InstId, ResourceType, Config, Opts) ->
|
||||||
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
ets:insert(emqx_resource_instance, {InstId, Res0}),
|
||||||
{ok, Res0};
|
{ok, Res0};
|
||||||
{error, Reason} when ForceCreate == false ->
|
{error, Reason} when ForceCreate == false ->
|
||||||
|
ets:delete(emqx_resource_instance, InstId),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -96,9 +96,7 @@ t_query(_) ->
|
||||||
?assert(false)
|
?assert(false)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
?assertException(
|
?assertMatch({error, {emqx_resource, #{reason := not_found}}},
|
||||||
error,
|
|
||||||
{get_instance, _Reason},
|
|
||||||
emqx_resource:query(<<"unknown">>, get_state)),
|
emqx_resource:query(<<"unknown">>, get_state)),
|
||||||
|
|
||||||
ok = emqx_resource:remove_local(?ID).
|
ok = emqx_resource:remove_local(?ID).
|
||||||
|
@ -142,7 +140,8 @@ t_stop_start(_) ->
|
||||||
|
|
||||||
?assertNot(is_process_alive(Pid0)),
|
?assertNot(is_process_alive(Pid0)),
|
||||||
|
|
||||||
?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)),
|
?assertMatch({error, {emqx_resource, #{reason := stopped}}},
|
||||||
|
emqx_resource:query(?ID, get_state)),
|
||||||
|
|
||||||
ok = emqx_resource:restart(?ID),
|
ok = emqx_resource:restart(?ID),
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ do_apply_rule(#{
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
||||||
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
||||||
{ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]};
|
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
|
||||||
false ->
|
false ->
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end;
|
end;
|
||||||
|
@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId,
|
||||||
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
||||||
{ok, handle_output_list(Outputs, Selected, Input)};
|
{ok, handle_output_list(RuleId, Outputs, Selected, Input)};
|
||||||
false ->
|
false ->
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end.
|
end.
|
||||||
|
@ -231,15 +231,17 @@ number(Bin) ->
|
||||||
catch error:badarg -> binary_to_float(Bin)
|
catch error:badarg -> binary_to_float(Bin)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_output_list(Outputs, Selected, Envs) ->
|
handle_output_list(RuleId, Outputs, Selected, Envs) ->
|
||||||
[handle_output(Out, Selected, Envs) || Out <- Outputs].
|
[handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
|
||||||
|
|
||||||
handle_output(OutId, Selected, Envs) ->
|
handle_output(RuleId, OutId, Selected, Envs) ->
|
||||||
try
|
try
|
||||||
do_handle_output(OutId, Selected, Envs)
|
do_handle_output(OutId, Selected, Envs)
|
||||||
catch
|
catch
|
||||||
Err:Reason:ST ->
|
Err:Reason:ST ->
|
||||||
?SLOG(error, #{msg => "output_failed",
|
ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId),
|
||||||
|
Level = case Err of throw -> debug; _ -> error end,
|
||||||
|
?SLOG(Level, #{msg => "output_failed",
|
||||||
output => OutId,
|
output => OutId,
|
||||||
exception => Err,
|
exception => Err,
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
|
|
Loading…
Reference in New Issue