diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index fe6a96ed4..2cfb0bfec 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -32,7 +32,9 @@ File format: * Add load control app for future development. * Change the precision of float to 17 digits after the decimal point when formatting a float using payload templates of rule actions. The old precision is 10 digits before - this change. + this change. [#7336] +* Return the cached resource status when querying a resource using HTTP APIs. + This is to avoid blocking the HTTP request if the resource is unavailable. [#7374] ### Bug fixes @@ -51,6 +53,7 @@ File format: * Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$` * Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`. * Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309] +* Auto subscribe to an empty topic will be simply ignored now ## v4.3.12 ### Important changes diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index fdf2dec81..5aa48cea7 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -1,6 +1,5 @@ %% -*-: erlang -*- - -{"4.3.4", +{VSN, [ {"4.3.3", [ {load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []} diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index e9f270397..a25d6cae1 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -6,15 +6,19 @@ [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-5]">>, + {<<"4\\.3\\.[3-4]">>, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, - {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}], + {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}], [{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_lwm2m}]}, {"4.3.2", [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-5]">>, + {<<"4\\.3\\.[3-4]">>, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, - {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}. + {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}. diff --git a/apps/emqx_management/src/emqx_mgmt_http.erl b/apps/emqx_management/src/emqx_mgmt_http.erl index 010a4fce2..6290ac1ec 100644 --- a/apps/emqx_management/src/emqx_mgmt_http.erl +++ b/apps/emqx_management/src/emqx_mgmt_http.erl @@ -119,9 +119,10 @@ handle_request(_Method, _Path, Req) -> cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req). authorize_appid(Req) -> - case cowboy_req:parse_header(<<"authorization">>, Req) of - {basic, AppId, AppSecret} -> emqx_mgmt_auth:is_authorized(AppId, AppSecret); - _ -> false + try + {basic, AppId, AppSecret} = cowboy_req:parse_header(<<"authorization">>, Req), + emqx_mgmt_auth:is_authorized(AppId, AppSecret) + catch _:_ -> false end. -ifdef(EMQX_ENTERPRISE). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 2eed25647..c8e69a17f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -334,14 +334,11 @@ test_resource(#{type := Type, config := Config0}) -> -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> - case emqx_rule_registry:find_resource(ResId) of - {ok, #resource{type = ResType}} -> - {ok, #resource_type{on_status = {Mod, OnStatus}}} - = emqx_rule_registry:find_resource_type(ResType), - Status = fetch_resource_status(Mod, OnStatus, ResId), + case emqx_rule_registry:find_resource_params(ResId) of + {ok, #resource_params{status = Status}} -> {ok, Status}; not_found -> - {error, {resource_not_found, ResId}} + {error, resource_not_initialized} end. -spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 80de9be45..39ac1e9c2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -295,13 +295,8 @@ do_create_resource(Create, ParsedParams) -> list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), - Data = lists:map(fun(Res = #{id := Id}) -> - Status = lists:all(fun(Node) -> - case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of - {ok, #resource_params{status = #{is_alive := true}}} -> true; - _ -> false - end - end, ekka_mnesia:running_nodes()), + Data = lists:map(fun(Res = #{id := ResId}) -> + Status = get_aggregated_status(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). @@ -309,12 +304,23 @@ list_resources(#{}, _Params) -> list_resources_by_type(#{type := Type}, _Params) -> return_all(emqx_rule_registry:get_resources_by_type(Type)). +get_aggregated_status(ResId) -> + lists:all(fun(Node) -> + case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of + {ok, #{is_alive := true}} -> true; + _ -> false + end + end, ekka_mnesia:running_nodes()). + show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> Status = [begin - {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]), + St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of + {ok, St0} -> St0; + {error, _} -> #{is_alive => false} + end, maps:put(node, Node, St) end || Node <- ekka_mnesia:running_nodes()], return({ok, maps:put(status, Status, record_to_map(R))}); @@ -326,8 +332,8 @@ get_resource_status(#{id := Id}, _Params) -> case emqx_rule_engine:get_resource_status(Id) of {ok, Status} -> return({ok, Status}); - {error, {resource_not_found, ResId}} -> - return({error, 400, ?ERR_NO_RESOURCE(ResId)}) + {error, resource_not_initialized} -> + return({error, 400, ?ERR_NO_RESOURCE(Id)}) end. start_resource(#{id := Id}, _Params) -> diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index d0eb585f7..3f0980b3c 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -1,6 +1,6 @@ %% -*- mode: erlang -*- {VSN, - [{<<"4\\.3\\.[0-2]$">>, + [{<<"4\\.3\\.[0-2]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -14,9 +14,10 @@ [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {"4.3.9", [ %% nothing so far + %% 4.3.9 is taken by release 4.3.12 ]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[0-2]$">>, + [{<<"4\\.3\\.[0-2]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -30,5 +31,6 @@ [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {"4.3.9", [ %% nothing so far + %% 4.3.9 is taken by release 4.3.12 ]}, {<<".*">>,[]}]}. diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl index 59318a5a1..76b53c41c 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl @@ -103,17 +103,17 @@ is_authorized(Req) -> is_authorized("/api/v4/auth", _Req) -> true; is_authorized(_Path, Req) -> - case cowboy_req:parse_header(<<"authorization">>, Req) of - {basic, Username, Password} -> - case emqx_dashboard_admin:check(iolist_to_binary(Username), - iolist_to_binary(Password)) of - ok -> true; - {error, Reason} -> - ?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p", - [Username, Reason]), - false - end; - _ -> false + try + {basic, Username, Password} = cowboy_req:parse_header(<<"authorization">>, Req), + case emqx_dashboard_admin:check(iolist_to_binary(Username), iolist_to_binary(Password)) of + ok -> true; + {error, Reason} -> + ?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p", + [Username, Reason]), + false + end + catch _:_ -> %% bad authorization header will crash. + false end. filter(#{app := emqx_plugin_libs}) -> true; diff --git a/lib-ce/emqx_modules/src/emqx_mod_subscription.erl b/lib-ce/emqx_modules/src/emqx_mod_subscription.erl index 06178aee7..1b6a2c1c7 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_subscription.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_subscription.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). %% emqx_gen_mod callbacks -export([ load/1 @@ -38,14 +39,33 @@ load(Topics) -> emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) -> - Replace = fun(Topic) -> - rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) + + OptFun = case ProtoVer of + ?MQTT_PROTO_V5 -> fun(X) -> X end; + _ -> fun(#{qos := Qos}) -> #{qos => Qos} end end, - TopicFilters = case ProtoVer of - ?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics]; - _ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics] - end, - self() ! {subscribe, TopicFilters}. + + Fold = fun({Topic, SubOpts}, Acc) -> + case rep(Topic, ClientId, Username) of + {error, Reason} -> + ?LOG(warning, "auto subscribe ignored, topic filter:~ts reason:~p~n", + [Topic, Reason]), + Acc; + <<>> -> + ?LOG(warning, "auto subscribe ignored, topic filter:~ts" + " reason: topic can't be empty~n", + [Topic]), + Acc; + NewTopic -> + [{NewTopic, OptFun(SubOpts)} | Acc] + end + end, + + case lists:foldl(Fold, [], Topics) of + [] -> ok; + TopicFilters -> + self() ! {subscribe, TopicFilters} + end. unload(_) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). @@ -56,10 +76,24 @@ description() -> %% Internal functions %%-------------------------------------------------------------------- -rep(<<"%c">>, ClientId, Topic) -> - emqx_topic:feed_var(<<"%c">>, ClientId, Topic); -rep(<<"%u">>, undefined, Topic) -> - Topic; -rep(<<"%u">>, Username, Topic) -> - emqx_topic:feed_var(<<"%u">>, Username, Topic). +rep(Topic, ClientId, Username) -> + Words = emqx_topic:words(Topic), + rep(Words, ClientId, Username, []). +rep([<<"%c">> | T], ClientId, Username, Acc) -> + rep(T, + ClientId, + Username, + [ClientId | Acc]); +rep([<<"%u">> | _], _, undefined, _) -> + {error, username_undefined}; +rep([<<"%u">> | T], ClientId, Username, Acc) -> + rep(T, + ClientId, + Username, + [Username | Acc]); +rep([H | T], ClientId, UserName, Acc) -> + rep(T, ClientId, UserName, [H | Acc]); + +rep([], _, _, Acc) -> + emqx_topic:join(lists:reverse(Acc)). diff --git a/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl index c2905754b..89a2678ef 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl @@ -60,7 +60,7 @@ t_suboption(_) -> Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end, Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2}, ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])), - {ok, C1} = emqtt:start_link([{proto_ver, v5}]), + {ok, C1} = emqtt:start_link([{proto_ver, v5}, {username, "admin"}]), {ok, _} = emqtt:connect(C1), timer:sleep(200), [CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)), @@ -69,7 +69,7 @@ t_suboption(_) -> ?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1), ok = emqtt:disconnect(C1), %% The subscription option is not valid for MQTT V3.1.1 - {ok, C2} = emqtt:start_link([{proto_ver, v4}]), + {ok, C2} = emqtt:start_link([{proto_ver, v4}, {username, "admin"}]), {ok, _} = emqtt:connect(C2), timer:sleep(200), [CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)),