Merge remote-tracking branch 'ce/main-v4.3' into merge-main-v4.3-into-v4.4

This commit is contained in:
JianBo He 2022-03-24 07:08:06 +08:00
commit 94efb190c6
10 changed files with 100 additions and 54 deletions

View File

@ -32,7 +32,9 @@ File format:
* Add load control app for future development. * Add load control app for future development.
* Change the precision of float to 17 digits after the decimal point when formatting a * Change the precision of float to 17 digits after the decimal point when formatting a
float using payload templates of rule actions. The old precision is 10 digits before float using payload templates of rule actions. The old precision is 10 digits before
this change. this change. [#7336]
* Return the cached resource status when querying a resource using HTTP APIs.
This is to avoid blocking the HTTP request if the resource is unavailable. [#7374]
### Bug fixes ### Bug fixes
@ -51,6 +53,7 @@ File format:
* Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$` * Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$`
* Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`. * Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`.
* Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309] * Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309]
* Auto subscribe to an empty topic will be simply ignored now
## v4.3.12 ## v4.3.12
### Important changes ### Important changes

View File

@ -1,6 +1,5 @@
%% -*-: erlang -*- %% -*-: erlang -*-
{VSN,
{"4.3.4",
[ [
{"4.3.3", [ {"4.3.3", [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []} {load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []}

View File

@ -6,15 +6,19 @@
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-5]">>, {<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}], {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}],
[{<<"4\\.3\\.[0-1]">>, [{<<"4\\.3\\.[0-1]">>,
[{restart_application,emqx_lwm2m}]}, [{restart_application,emqx_lwm2m}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-5]">>, {<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}. {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}.

View File

@ -119,9 +119,10 @@ handle_request(_Method, _Path, Req) ->
cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req). cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req).
authorize_appid(Req) -> authorize_appid(Req) ->
case cowboy_req:parse_header(<<"authorization">>, Req) of try
{basic, AppId, AppSecret} -> emqx_mgmt_auth:is_authorized(AppId, AppSecret); {basic, AppId, AppSecret} = cowboy_req:parse_header(<<"authorization">>, Req),
_ -> false emqx_mgmt_auth:is_authorized(AppId, AppSecret)
catch _:_ -> false
end. end.
-ifdef(EMQX_ENTERPRISE). -ifdef(EMQX_ENTERPRISE).

View File

@ -334,14 +334,11 @@ test_resource(#{type := Type, config := Config0}) ->
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) -> get_resource_status(ResId) ->
case emqx_rule_registry:find_resource(ResId) of case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource{type = ResType}} -> {ok, #resource_params{status = Status}} ->
{ok, #resource_type{on_status = {Mod, OnStatus}}}
= emqx_rule_registry:find_resource_type(ResType),
Status = fetch_resource_status(Mod, OnStatus, ResId),
{ok, Status}; {ok, Status};
not_found -> not_found ->
{error, {resource_not_found, ResId}} {error, resource_not_initialized}
end. end.
-spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}). -spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}).

View File

@ -295,13 +295,8 @@ do_create_resource(Create, ParsedParams) ->
list_resources(#{}, _Params) -> list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := Id}) -> Data = lists:map(fun(Res = #{id := ResId}) ->
Status = lists:all(fun(Node) -> Status = get_aggregated_status(ResId),
case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of
{ok, #resource_params{status = #{is_alive := true}}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()),
maps:put(status, Status, Res) maps:put(status, Status, Res)
end, Data0), end, Data0),
return({ok, Data}). return({ok, Data}).
@ -309,12 +304,23 @@ list_resources(#{}, _Params) ->
list_resources_by_type(#{type := Type}, _Params) -> list_resources_by_type(#{type := Type}, _Params) ->
return_all(emqx_rule_registry:get_resources_by_type(Type)). return_all(emqx_rule_registry:get_resources_by_type(Type)).
get_aggregated_status(ResId) ->
lists:all(fun(Node) ->
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
{ok, #{is_alive := true}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()).
show_resource(#{id := Id}, _Params) -> show_resource(#{id := Id}, _Params) ->
case emqx_rule_registry:find_resource(Id) of case emqx_rule_registry:find_resource(Id) of
{ok, R} -> {ok, R} ->
Status = Status =
[begin [begin
{ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]), St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of
{ok, St0} -> St0;
{error, _} -> #{is_alive => false}
end,
maps:put(node, Node, St) maps:put(node, Node, St)
end || Node <- ekka_mnesia:running_nodes()], end || Node <- ekka_mnesia:running_nodes()],
return({ok, maps:put(status, Status, record_to_map(R))}); return({ok, maps:put(status, Status, record_to_map(R))});
@ -326,8 +332,8 @@ get_resource_status(#{id := Id}, _Params) ->
case emqx_rule_engine:get_resource_status(Id) of case emqx_rule_engine:get_resource_status(Id) of
{ok, Status} -> {ok, Status} ->
return({ok, Status}); return({ok, Status});
{error, {resource_not_found, ResId}} -> {error, resource_not_initialized} ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)}) return({error, 400, ?ERR_NO_RESOURCE(Id)})
end. end.
start_resource(#{id := Id}, _Params) -> start_resource(#{id := Id}, _Params) ->

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{<<"4\\.3\\.[0-2]$">>, [{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -14,9 +14,10 @@
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[ %% nothing so far [ %% nothing so far
%% 4.3.9 is taken by release 4.3.12
]}, ]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]$">>, [{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -30,5 +31,6 @@
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[ %% nothing so far [ %% nothing so far
%% 4.3.9 is taken by release 4.3.12
]}, ]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -103,17 +103,17 @@ is_authorized(Req) ->
is_authorized("/api/v4/auth", _Req) -> is_authorized("/api/v4/auth", _Req) ->
true; true;
is_authorized(_Path, Req) -> is_authorized(_Path, Req) ->
case cowboy_req:parse_header(<<"authorization">>, Req) of try
{basic, Username, Password} -> {basic, Username, Password} = cowboy_req:parse_header(<<"authorization">>, Req),
case emqx_dashboard_admin:check(iolist_to_binary(Username), case emqx_dashboard_admin:check(iolist_to_binary(Username), iolist_to_binary(Password)) of
iolist_to_binary(Password)) of
ok -> true; ok -> true;
{error, Reason} -> {error, Reason} ->
?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p", ?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p",
[Username, Reason]), [Username, Reason]),
false false
end; end
_ -> false catch _:_ -> %% bad authorization header will crash.
false
end. end.
filter(#{app := emqx_plugin_libs}) -> true; filter(#{app := emqx_plugin_libs}) -> true;

View File

@ -20,6 +20,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
%% emqx_gen_mod callbacks %% emqx_gen_mod callbacks
-export([ load/1 -export([ load/1
@ -38,14 +39,33 @@ load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) -> on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) ->
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) OptFun = case ProtoVer of
?MQTT_PROTO_V5 -> fun(X) -> X end;
_ -> fun(#{qos := Qos}) -> #{qos => Qos} end
end, end,
TopicFilters = case ProtoVer of
?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics]; Fold = fun({Topic, SubOpts}, Acc) ->
_ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics] case rep(Topic, ClientId, Username) of
{error, Reason} ->
?LOG(warning, "auto subscribe ignored, topic filter:~ts reason:~p~n",
[Topic, Reason]),
Acc;
<<>> ->
?LOG(warning, "auto subscribe ignored, topic filter:~ts"
" reason: topic can't be empty~n",
[Topic]),
Acc;
NewTopic ->
[{NewTopic, OptFun(SubOpts)} | Acc]
end
end, end,
self() ! {subscribe, TopicFilters}.
case lists:foldl(Fold, [], Topics) of
[] -> ok;
TopicFilters ->
self() ! {subscribe, TopicFilters}
end.
unload(_) -> unload(_) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
@ -56,10 +76,24 @@ description() ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
rep(<<"%c">>, ClientId, Topic) -> rep(Topic, ClientId, Username) ->
emqx_topic:feed_var(<<"%c">>, ClientId, Topic); Words = emqx_topic:words(Topic),
rep(<<"%u">>, undefined, Topic) -> rep(Words, ClientId, Username, []).
Topic;
rep(<<"%u">>, Username, Topic) ->
emqx_topic:feed_var(<<"%u">>, Username, Topic).
rep([<<"%c">> | T], ClientId, Username, Acc) ->
rep(T,
ClientId,
Username,
[ClientId | Acc]);
rep([<<"%u">> | _], _, undefined, _) ->
{error, username_undefined};
rep([<<"%u">> | T], ClientId, Username, Acc) ->
rep(T,
ClientId,
Username,
[Username | Acc]);
rep([H | T], ClientId, UserName, Acc) ->
rep(T, ClientId, UserName, [H | Acc]);
rep([], _, _, Acc) ->
emqx_topic:join(lists:reverse(Acc)).

View File

@ -60,7 +60,7 @@ t_suboption(_) ->
Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end, Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end,
Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2}, Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2},
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])), ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])),
{ok, C1} = emqtt:start_link([{proto_ver, v5}]), {ok, C1} = emqtt:start_link([{proto_ver, v5}, {username, "admin"}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
timer:sleep(200), timer:sleep(200),
[CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)), [CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)),
@ -69,7 +69,7 @@ t_suboption(_) ->
?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1), ?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1),
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
%% The subscription option is not valid for MQTT V3.1.1 %% The subscription option is not valid for MQTT V3.1.1
{ok, C2} = emqtt:start_link([{proto_ver, v4}]), {ok, C2} = emqtt:start_link([{proto_ver, v4}, {username, "admin"}]),
{ok, _} = emqtt:connect(C2), {ok, _} = emqtt:connect(C2),
timer:sleep(200), timer:sleep(200),
[CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)), [CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)),