diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 45bcea72e..57d921c16 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -18,10 +18,13 @@ File format: management listener. [#8411] - Fixed crash when shared persistent subscription [#8441] +- Fixed issue in Lua hook that prevented messages from being + rejected [#8535] ### Enhancements - HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450] - Add check_conf cli to check config format. [#8486] +- Optimize performance of shared subscription ## v4.3.16 diff --git a/apps/emqx_lua_hook/src/emqx_lua_hook.app.src b/apps/emqx_lua_hook/src/emqx_lua_hook.app.src index 6f60c4a1a..550981b70 100644 --- a/apps/emqx_lua_hook/src/emqx_lua_hook.app.src +++ b/apps/emqx_lua_hook/src/emqx_lua_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_lua_hook, [{description, "EMQ X Lua Hooks"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_lua_hook/src/emqx_lua_hook.appup.src b/apps/emqx_lua_hook/src/emqx_lua_hook.appup.src index 7e91ac8c4..c4178c620 100644 --- a/apps/emqx_lua_hook/src/emqx_lua_hook.appup.src +++ b/apps/emqx_lua_hook/src/emqx_lua_hook.appup.src @@ -1,5 +1,7 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}], - [{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}. + [{"4.3.1",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}, + {"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}], + [{"4.3.1",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}, + {"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}. diff --git a/apps/emqx_lua_hook/src/emqx_lua_script.erl b/apps/emqx_lua_hook/src/emqx_lua_script.erl index f5629309f..b5f544916 100644 --- a/apps/emqx_lua_hook/src/emqx_lua_script.erl +++ b/apps/emqx_lua_hook/src/emqx_lua_script.erl @@ -283,13 +283,14 @@ on_message_publish(Message = #message{from = ClientId, ?LOG(error, "Failed to execute function on_message_publish(), which has syntax error, St=~p", [St]), {ok, Message}; {[false], _St} -> - {stop, Message}; + ?LOG(debug, "Lua function on_message_publish() returned false, setting allow_publish header to false", []), + {stop, Message#message{headers = Headers#{allow_publish => false}}}; {[NewTopic, NewPayload, NewQos, NewRetain], _St} -> - ?LOG(debug, "Lua function on_message_publish() return ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]), + ?LOG(debug, "Lua function on_message_publish() returned ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]), {ok, Message#message{topic = NewTopic, payload = NewPayload, qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}}; Other -> - ?LOG(error, "Topic=~p, lua function on_message_publish caught exception, ~p", [Topic, Other]), + ?LOG(error, "Topic=~p, lua function on_message_publish() caught exception, ~p", [Topic, Other]), {ok, Message} end. diff --git a/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl b/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl index 9b9f7c410..bb9247464 100644 --- a/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl +++ b/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl @@ -92,7 +92,7 @@ case02(_Config) -> Msg = #message{from = <<"myclient">>, qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{username => <<"tester">>}}, Ret = emqx_hooks:run_fold('message.publish',[], Msg), - ?assertEqual(Msg, Ret). + ?assertEqual(Msg#message{headers = #{username => <<"tester">>, allow_publish => false}}, Ret). case03(_Config) -> ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]), diff --git a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl index c8bd3a967..9f0f635d9 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl @@ -46,7 +46,7 @@ load(RawRules) -> {PubRules, SubRules} = compile(RawRules), - ?LOG(info, "[Rewrite] Load rule pub ~0p sub ~0p", [PubRules, SubRules]), + log_start(RawRules), emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000), emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000). @@ -75,6 +75,20 @@ description() -> %% Internal functions %%-------------------------------------------------------------------- +log_start(Rules) -> + PubRules = [{pub, Topic, Re, Dest} || {rewrite, pub, Topic, Re, Dest} <- Rules], + SubRules = [{sub, Topic, Re, Dest} || {rewrite, sub, Topic, Re, Dest} <- Rules], + ?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p", + [erlang:length(PubRules), erlang:length(SubRules)]), + log_rule(PubRules, 1), + log_rule(SubRules, 1). + +log_rule([], _Index) -> ok; +log_rule([{Type, Topic, Re, Dest} | Rules], Index) -> + ?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts", + [Type, Index, Topic, Re, Dest]), + log_rule(Rules, Index + 1). + compile(Rules) -> PubRules = [ begin {ok, MP} = re:compile(Re), diff --git a/src/emqx.appup.src b/src/emqx.appup.src index fe4cb32c3..d6d20aa0f 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -7,7 +7,8 @@ {update,emqx_broker_sup,supervisor}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}]}, {"4.4.4", [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, @@ -23,7 +24,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, - {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}]}, {"4.4.3", [{add_module,emqx_calendar}, {update,emqx_broker_sup,supervisor}, @@ -163,6 +165,7 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]}, {"4.4.4", [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, @@ -179,6 +182,7 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, {"4.4.3", [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -271,6 +275,7 @@ {delete_module,emqx_relup}]}, {"4.4.0", [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_message,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, {load_module,emqx_topic,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 84d72d177..1d300c743 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -672,7 +672,8 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). terminate(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session), - redispatch_shared_messages(Session), + Reason =/= takeovered andalso + redispatch_shared_messages(Session), ok. run_terminate_hooks(ClientInfo, discarded, Session) ->