Merge pull request #8529 from ieQu1/copy-of-main-4.3

Sync 4.3 changes
This commit is contained in:
zhongwencool 2022-07-25 09:41:39 +08:00 committed by GitHub
commit 9c41ddaf3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 37 additions and 11 deletions

View File

@ -18,10 +18,13 @@ File format:
management listener. [#8411]
- Fixed crash when shared persistent subscription [#8441]
- Fixed issue in Lua hook that prevented messages from being
rejected [#8535]
### Enhancements
- HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450]
- Add check_conf cli to check config format. [#8486]
- Optimize performance of shared subscription
## v4.3.16

View File

@ -1,6 +1,6 @@
{application, emqx_lua_hook,
[{description, "EMQ X Lua Hooks"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib]},

View File

@ -1,5 +1,7 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}],
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}.
[{"4.3.1",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]},
{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}],
[{"4.3.1",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]},
{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}.

View File

@ -283,13 +283,14 @@ on_message_publish(Message = #message{from = ClientId,
?LOG(error, "Failed to execute function on_message_publish(), which has syntax error, St=~p", [St]),
{ok, Message};
{[false], _St} ->
{stop, Message};
?LOG(debug, "Lua function on_message_publish() returned false, setting allow_publish header to false", []),
{stop, Message#message{headers = Headers#{allow_publish => false}}};
{[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
?LOG(debug, "Lua function on_message_publish() return ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]),
?LOG(debug, "Lua function on_message_publish() returned ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]),
{ok, Message#message{topic = NewTopic, payload = NewPayload,
qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
Other ->
?LOG(error, "Topic=~p, lua function on_message_publish caught exception, ~p", [Topic, Other]),
?LOG(error, "Topic=~p, lua function on_message_publish() caught exception, ~p", [Topic, Other]),
{ok, Message}
end.

View File

@ -92,7 +92,7 @@ case02(_Config) ->
Msg = #message{from = <<"myclient">>, qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{username => <<"tester">>}},
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
?assertEqual(Msg, Ret).
?assertEqual(Msg#message{headers = #{username => <<"tester">>, allow_publish => false}}, Ret).
case03(_Config) ->
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),

View File

@ -46,7 +46,7 @@
load(RawRules) ->
{PubRules, SubRules} = compile(RawRules),
?LOG(info, "[Rewrite] Load rule pub ~0p sub ~0p", [PubRules, SubRules]),
log_start(RawRules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
@ -75,6 +75,20 @@ description() ->
%% Internal functions
%%--------------------------------------------------------------------
log_start(Rules) ->
PubRules = [{pub, Topic, Re, Dest} || {rewrite, pub, Topic, Re, Dest} <- Rules],
SubRules = [{sub, Topic, Re, Dest} || {rewrite, sub, Topic, Re, Dest} <- Rules],
?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p",
[erlang:length(PubRules), erlang:length(SubRules)]),
log_rule(PubRules, 1),
log_rule(SubRules, 1).
log_rule([], _Index) -> ok;
log_rule([{Type, Topic, Re, Dest} | Rules], Index) ->
?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts",
[Type, Index, Topic, Re, Dest]),
log_rule(Rules, Index + 1).
compile(Rules) ->
PubRules = [ begin
{ok, MP} = re:compile(Re),

View File

@ -7,7 +7,8 @@
{update,emqx_broker_sup,supervisor},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
@ -23,7 +24,8 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{add_module,emqx_calendar},
{update,emqx_broker_sup,supervisor},
@ -163,6 +165,7 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -179,6 +182,7 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -271,6 +275,7 @@
{delete_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},

View File

@ -672,7 +672,8 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session),
redispatch_shared_messages(Session),
Reason =/= takeovered andalso
redispatch_shared_messages(Session),
ok.
run_terminate_hooks(ClientInfo, discarded, Session) ->