Merge branch 'release-5.0-beta.3' into api-key-update-unexpired

This commit is contained in:
zhongwencool 2022-01-02 09:35:04 +08:00 committed by GitHub
commit 33523d9294
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 307 additions and 101 deletions

View File

@ -153,9 +153,8 @@ t_destroy(_Config) ->
?GLOBAL),
% Authenticator should not be usable anymore
?assertException(
error,
_,
?assertMatch(
ignore,
emqx_authn_http:authenticate(
Credentials,
State)).

View File

@ -146,9 +146,8 @@ t_destroy(_Config) ->
?GLOBAL),
% Authenticator should not be usable anymore
?assertException(
error,
_,
?assertMatch(
ignore,
emqx_authn_mongodb:authenticate(
#{username => <<"plain">>,
password => <<"plain">>

View File

@ -159,9 +159,8 @@ t_destroy(_Config) ->
?GLOBAL),
% Authenticator should not be usable anymore
?assertException(
error,
_,
?assertMatch(
ignore,
emqx_authn_mysql:authenticate(
#{username => <<"plain">>,
password => <<"plain">>

View File

@ -159,9 +159,8 @@ t_destroy(_Config) ->
?GLOBAL),
% Authenticator should not be usable anymore
?assertException(
error,
_,
?assertMatch(
ignore,
emqx_authn_pgsql:authenticate(
#{username => <<"plain">>,
password => <<"plain">>

View File

@ -164,9 +164,8 @@ t_destroy(_Config) ->
?GLOBAL),
% Authenticator should not be usable anymore
?assertException(
error,
_,
?assertMatch(
ignore,
emqx_authn_redis:authenticate(
#{username => <<"plain">>,
password => <<"plain">>

View File

@ -39,11 +39,14 @@
, lookup/3
, list/0
, list_bridges_by_connector/1
, create/2
, create/3
, recreate/2
, recreate/3
, create_dry_run/2
, remove/1
, remove/3
, update/2
, update/3
, start/2
, stop/2
@ -80,17 +83,36 @@ unload_hook() ->
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
case maps:get(sys, Flags, false) of
false ->
lists:foreach(fun (Id) ->
send_message(Id, emqx_rule_events:eventmsg_publish(Message))
end, get_matched_bridges(Topic));
Msg = emqx_rule_events:eventmsg_publish(Message),
send_to_matched_egress_bridges(Topic, Msg);
true -> ok
end,
{ok, Message}.
send_to_matched_egress_bridges(Topic, Msg) ->
lists:foreach(fun (Id) ->
try send_message(Id, Msg) of
ok -> ok;
Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed",
bridge => Id, error => Error})
catch Err:Reason:ST ->
?SLOG(error, #{msg => "send_message_to_bridge_crash",
bridge => Id, error => Err, reason => Reason,
stacktrace => ST})
end
end, get_matched_bridges(Topic)).
send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
emqx_resource:query(ResId, {send_message, Message}).
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
not_found ->
{error, {bridge_not_found, BridgeId}};
#{enable := true} ->
emqx_resource:query(ResId, {send_message, Message});
#{enable := false} ->
{error, {bridge_stopped, BridgeId}}
end.
config_key_path() ->
[bridges].
@ -188,6 +210,10 @@ stop(Type, Name) ->
restart(Type, Name) ->
emqx_resource:restart(resource_id(Type, Name)).
create(BridgeId, Conf) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
create(BridgeType, BridgeName, Conf).
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
@ -198,6 +224,10 @@ create(Type, Name, Conf) ->
{error, Reason} -> {error, Reason}
end.
update(BridgeId, {OldConf, Conf}) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
update(BridgeType, BridgeName, {OldConf, Conf}).
update(Type, Name, {OldConf, Conf}) ->
%% TODO: sometimes its not necessary to restart the bridge connection.
%%
@ -244,6 +274,10 @@ create_dry_run(Type, Conf) ->
Error
end.
remove(BridgeId) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
remove(BridgeType, BridgeName, #{}).
remove(Type, Name, _Conf) ->
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
case emqx_resource:remove_local(resource_id(Type, Name)) of
@ -279,6 +313,8 @@ get_matched_bridges(Topic) ->
end, Acc0, Conf)
end, [], Bridges).
get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
Acc;
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [bridge_id(BType, BName) | Acc];
@ -309,21 +345,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
{Type, ConnName} ->
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
make_resource_confs(Direction, ConnectorConfs,
maps:without([connector, direction], Conf), Name);
maps:without([connector, direction], Conf), Type, Name);
{_ConnType, _ConnName} ->
error({cannot_use_connector_with_different_type, ConnId})
end;
parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
when is_map(ConnectorConfs) ->
make_resource_confs(Direction, ConnectorConfs,
maps:without([connector, direction], Conf), Name).
maps:without([connector, direction], Conf), Type, Name).
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) ->
BName = bin(Name),
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
BName = bridge_id(Type, Name),
ConnectorConfs#{
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
};
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) ->
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
ConnectorConfs#{
egress => BridgeConf
}.

View File

@ -68,7 +68,6 @@ How long will the HTTP request timeout.
fields("post") ->
[ type_field()
, name_field()
] ++ fields("bridge");
fields("put") ->
@ -103,8 +102,5 @@ id_field() ->
type_field() ->
{type, mk(http, #{desc => "The Bridge Type"})}.
name_field() ->
{name, mk(binary(), #{desc => "The Bridge Name"})}.
method() ->
enum([post, put, get, delete]).

View File

@ -24,11 +24,9 @@ fields("egress") ->
fields("post_ingress") ->
[ type_field()
, name_field()
] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") ->
[ type_field()
, name_field()
] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") ->
@ -49,9 +47,3 @@ id_field() ->
type_field() ->
{type, mk(mqtt, #{desc => "The Bridge Type"})}.
name_field() ->
{name, mk(binary(),
#{ desc => "The Bridge Name"
, example => "some_bridge_name"
})}.

View File

@ -68,10 +68,6 @@ fields("put") ->
fields("post") ->
[ {type, mk(mqtt, #{desc => "The Connector Type"})}
, {name, mk(binary(),
#{ desc => "The Connector Name"
, example => <<"my_mqtt_connector">>
})}
] ++ fields("put").
%% ===================================================================

View File

@ -165,7 +165,8 @@ handle_publish(Msg, undefined) ->
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
"_'ingress'_is_not_configured",
message => Msg});
handle_publish(Msg, Vars) ->
handle_publish(Msg0, Vars) ->
Msg = format_msg_received(Msg0),
?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}),
case Vars of
@ -173,11 +174,7 @@ handle_publish(Msg, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok
end,
case maps:get(local_topic, Vars, undefined) of
undefined -> ok;
_Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
end.
maybe_publish_to_local_broker(Msg0, Vars).
handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}.
@ -197,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
process_config(Config) ->
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
case maps:get(local_topic, Vars, undefined) of
undefined ->
ok; %% local topic is not set, discard it
_ ->
case emqx_topic:match(Topic, SubTopic) of
true ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
ok;
false ->
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
message => Msg, subscribed => SubTopic, got_topic => Topic})
end
end.
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
dup => Dup,
retain => Retain,
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond)
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).

View File

@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
MapMsg = format_msg_received(MapMsg0),
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
flags => #{dup => Dup, retain => Retain},
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond),
node => node()
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).
process_payload([], Msg) ->
emqx_json:encode(Msg);
process_payload(Tks, Msg) ->

View File

@ -22,7 +22,10 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"connectors: {}">>).
%% output functions
-export([ inspect/3
]).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>).
@ -67,6 +70,9 @@
<<"failed">> := FAILED, <<"rate">> := SPEED,
<<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}).
inspect(Selected, _Envs, _Args) ->
persistent_term:put(?MODULE, #{inspect => Selected}).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -89,21 +95,35 @@ init_per_suite(Config) ->
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT),
ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector,
emqx_bridge, emqx_dashboard]),
ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>),
ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>),
ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]),
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
ok.
clear_resources() ->
lists:foreach(fun(#{id := Id}) ->
ok = emqx_rule_engine:delete_rule(Id)
end, emqx_rule_engine:get_rules()),
lists:foreach(fun(#{id := Id}) ->
ok = emqx_bridge:remove(Id)
end, emqx_bridge:list()),
lists:foreach(fun(#{<<"id">> := Id}) ->
ok = emqx_connector:delete(Id)
end, emqx_connector:list()).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -223,7 +243,6 @@ t_mqtt_conn_bridge_ingress(_) ->
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
@ -435,6 +454,150 @@ t_mqtt_conn_testing(_) ->
<<"name">> => ?BRIDGE_NAME_EGRESS
}).
t_ingress_mqtt_bridge_with_rules(_) ->
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS
}),
#{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge),
{ok, 201, Rule} = request(post, uri(["rules"]),
#{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
<<"enable">> => true,
<<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
}),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
%% and also the rule should be matched, with matched + 1:
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{ <<"id">> := RuleId
, <<"metrics">> := #{<<"matched">> := 1}
} = jsx:decode(Rule1),
%% we also check if the outputs of the rule is triggered
?assertMatch(#{inspect := #{
event := '$bridges/mqtt',
id := MsgId,
payload := Payload,
topic := RemoteTopic,
qos := 0,
dup := false,
retain := false,
pub_props := #{},
timestamp := _
}} when is_binary(MsgId), persistent_term:get(?MODULE)),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
t_egress_mqtt_bridge_with_rules(_) ->
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
#{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge),
{ok, 201, Rule} = request(post, uri(["rules"]),
#{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
<<"enable">> => true,
<<"outputs">> => [BridgeIDEgress],
<<"sql">> => <<"SELECT * from \"t/1\"">>
}),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
emqx:unsubscribe(RemoteTopic),
%% PUBLISH a message to the rule.
Payload2 = <<"hi">>,
RuleTopic = <<"t/1">>,
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
emqx:subscribe(RemoteTopic2),
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{ <<"id">> := RuleId
, <<"metrics">> := #{<<"matched">> := 1}
} = jsx:decode(Rule1),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic2, #message{payload = Payload2}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
, <<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
}, jsx:decode(BridgeStr)),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------

View File

@ -25,7 +25,7 @@
mod := module(),
config := resource_config(),
state := resource_state(),
status := started | stopped,
status := started | stopped | starting,
metrics := emqx_plugin_libs_metrics:metrics()
}.
-type resource_group() :: binary().
@ -41,3 +41,5 @@
%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
%% actions upon query failure
-type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
-define(TEST_ID_PREFIX, "_test_:").

View File

@ -82,7 +82,6 @@
]).
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
-optional_callbacks([ on_query/4
@ -170,7 +169,7 @@ create_dry_run(ResourceType, Config) ->
-spec create_dry_run_local(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}.
create_dry_run_local(ResourceType, Config) ->
InstId = iolist_to_binary(emqx_misc:gen_id(16)),
InstId = emqx_resource_instance:make_test_id(),
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
-spec recreate(instance_id(), resource_type(), resource_config(), term()) ->
@ -201,14 +200,18 @@ query(InstId, Request) ->
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
query(InstId, Request, AfterQuery) ->
case get_instance(InstId) of
{ok, #{status := starting}} ->
query_error(starting, <<"cannot serve query when the resource "
"instance is still starting">>);
{ok, #{status := stopped}} ->
error({resource_stopped, InstId});
query_error(stopped, <<"cannot serve query when the resource "
"instance is stopped">>);
{ok, #{mod := Mod, state := ResourceState, status := started}} ->
%% the resource state is readonly to Module:on_query/4
%% and the `after_query()` functions should be thread safe
Mod:on_query(InstId, Request, AfterQuery, ResourceState);
{error, Reason} ->
error({get_instance, {InstId, Reason}})
{error, not_found} ->
query_error(not_found, <<"the resource id not exists">>)
end.
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
@ -368,3 +371,6 @@ cluster_call(Func, Args) ->
{ok, _TxnId, Result} -> Result;
Failed -> Failed
end.
query_error(Reason, Msg) ->
{error, {?MODULE, #{reason => Reason, msg => Msg}}}.

View File

@ -26,6 +26,7 @@
-export([ lookup/1
, get_metrics/1
, list_all/0
, make_test_id/0
]).
-export([ hash_call/2
@ -61,7 +62,7 @@ hash_call(InstId, Request) ->
hash_call(InstId, Request, Timeout) ->
gen_server:call(pick(InstId), Request, Timeout).
-spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}.
lookup(InstId) ->
case ets:lookup(emqx_resource_instance, InstId) of
[] -> {error, not_found};
@ -69,6 +70,10 @@ lookup(InstId) ->
{ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
end.
make_test_id() ->
RandId = iolist_to_binary(emqx_misc:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>.
get_metrics(InstId) ->
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
@ -146,7 +151,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
NewConfig, Params),
TestInstId = iolist_to_binary(emqx_misc:gen_id(16)),
TestInstId = make_test_id(),
case do_create_dry_run(TestInstId, ResourceType, Config) of
ok ->
do_remove(ResourceType, InstId, ResourceState, false),
@ -166,7 +171,9 @@ do_create(InstId, ResourceType, Config, Opts) ->
{ok, _} -> {ok, already_created};
_ ->
Res0 = #{id => InstId, mod => ResourceType, config => Config,
status => stopped, state => undefined},
status => starting, state => undefined},
%% The `emqx_resource:call_start/3` need the instance exist beforehand
ets:insert(emqx_resource_instance, {InstId, Res0}),
case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} ->
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
@ -181,6 +188,7 @@ do_create(InstId, ResourceType, Config, Opts) ->
ets:insert(emqx_resource_instance, {InstId, Res0}),
{ok, Res0};
{error, Reason} when ForceCreate == false ->
ets:delete(emqx_resource_instance, InstId),
{error, Reason}
end
end.

View File

@ -96,9 +96,7 @@ t_query(_) ->
?assert(false)
end,
?assertException(
error,
{get_instance, _Reason},
?assertMatch({error, {emqx_resource, #{reason := not_found}}},
emqx_resource:query(<<"unknown">>, get_state)),
ok = emqx_resource:remove_local(?ID).
@ -142,7 +140,8 @@ t_stop_start(_) ->
?assertNot(is_process_alive(Pid0)),
?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)),
?assertMatch({error, {emqx_resource, #{reason := stopped}}},
emqx_resource:query(?ID, get_state)),
ok = emqx_resource:restart(?ID),

View File

@ -101,7 +101,7 @@ do_apply_rule(#{
true ->
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
{ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]};
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
false ->
{error, nomatch}
end;
@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId,
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true ->
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
{ok, handle_output_list(Outputs, Selected, Input)};
{ok, handle_output_list(RuleId, Outputs, Selected, Input)};
false ->
{error, nomatch}
end.
@ -231,15 +231,17 @@ number(Bin) ->
catch error:badarg -> binary_to_float(Bin)
end.
handle_output_list(Outputs, Selected, Envs) ->
[handle_output(Out, Selected, Envs) || Out <- Outputs].
handle_output_list(RuleId, Outputs, Selected, Envs) ->
[handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
handle_output(OutId, Selected, Envs) ->
handle_output(RuleId, OutId, Selected, Envs) ->
try
do_handle_output(OutId, Selected, Envs)
catch
Err:Reason:ST ->
?SLOG(error, #{msg => "output_failed",
ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId),
Level = case Err of throw -> debug; _ -> error end,
?SLOG(Level, #{msg => "output_failed",
output => OutId,
exception => Err,
reason => Reason,

View File

@ -59,13 +59,13 @@ statsd(put, #{body := Body}) ->
Body,
#{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewConfig, config := Config}} ->
_ = emqx_statsd_sup:stop_child(?APP),
case maps:get(<<"enable">>, Body) of
true ->
ok = emqx_statsd_sup:ensure_child_stopped(?APP),
ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config));
false ->
ok = emqx_statsd_sup:ensure_child_stopped(?APP)
end,
{200, NewConfig};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),