Merge pull request #7619 from HJianBo/merge-main-v4.3-into-v4.4

Pre-release v4.4.3-rc.3: Merge main v4.3 into v4.4
This commit is contained in:
Xinyu Liu 2022-04-14 11:58:30 +08:00 committed by GitHub
commit 9b18bcd3c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 54 additions and 9 deletions

View File

@ -31,6 +31,7 @@ File format:
node. It will improves the efficiency of shared messages dispatching in certain node. It will improves the efficiency of shared messages dispatching in certain
scenarios, especially when the emqx-bridge-mqtt plugin is configured as shared scenarios, especially when the emqx-bridge-mqtt plugin is configured as shared
subscription. [#7462] subscription. [#7462]
* Add some compression functions to rule-engine: gzip, gunzip, zip, unzip, zip_compress, zip_uncompress
### Bug fixes ### Bug fixes

View File

@ -222,19 +222,29 @@ test_rule_sql(Params) ->
do_create_rule(Params) -> do_create_rule(Params) ->
case parse_rule_params(Params) of case parse_rule_params(Params) of
{ok, ParsedParams} -> {ok, ParsedParams} ->
case emqx_rule_engine:create_rule(ParsedParams) of case maps:find(id, ParsedParams) of
{ok, Rule} -> return({ok, record_to_map(Rule)}); {ok, RuleId} ->
{error, {action_not_found, ActionName}} -> case emqx_rule_registry:get_rule(RuleId) of
return({error, 400, ?ERR_NO_ACTION(ActionName)}); {ok, _} -> return({error, 400, <<"Already Exists">>});
{error, Reason} -> not_found -> do_create_rule2(ParsedParams)
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), end;
return({error, 400, ?ERR_BADARGS(Reason)}) error -> do_create_rule2(ParsedParams)
end; end;
{error, Reason} -> {error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)}) return({error, 400, ?ERR_BADARGS(Reason)})
end. end.
do_create_rule2(ParsedParams) ->
case emqx_rule_engine:create_rule(ParsedParams) of
{ok, Rule} -> return({ok, record_to_map(Rule)});
{error, {action_not_found, ActionName}} ->
return({error, 400, ?ERR_NO_ACTION(ActionName)});
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
update_rule(#{id := Id}, Params) -> update_rule(#{id := Id}, Params) ->
case parse_rule_params(Params, #{id => Id}) of case parse_rule_params(Params, #{id => Id}) of
{ok, ParsedParams} -> {ok, ParsedParams} ->

View File

@ -182,6 +182,11 @@
, unzip/1 , unzip/1
]). ]).
%% compressed Funcs
-export([ zip_compress/1
, zip_uncompress/1
]).
%% Data encode and decode %% Data encode and decode
-export([ base64_encode/1 -export([ base64_encode/1
, base64_decode/1 , base64_decode/1
@ -823,6 +828,16 @@ zip(S) when is_binary(S) ->
unzip(S) when is_binary(S) -> unzip(S) when is_binary(S) ->
zlib:unzip(S). zlib:unzip(S).
%%------------------------------------------------------------------------------
%% zip_compress Funcs
%%------------------------------------------------------------------------------
zip_compress(S) when is_binary(S) ->
zlib:compress(S).
zip_uncompress(S) when is_binary(S) ->
zlib:uncompress(S).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Data encode and decode Funcs %% Data encode and decode Funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -430,6 +430,14 @@ t_crud_rule_api(_Config) ->
{<<"params">>,[{<<"arg1">>,1}]}]]}, {<<"params">>,[{<<"arg1">>,1}]}]]},
{<<"description">>, <<"debug rule">>}]), {<<"description">>, <<"debug rule">>}]),
RuleID = maps:get(id, Rule), RuleID = maps:get(id, Rule),
{ok, #{code := 400, message := <<"Already Exists">>}} =
emqx_rule_engine_api:create_rule(#{},
[{<<"name">>, <<"debug-rule">>},
{<<"id">>, RuleID},
{<<"rawsql">>, <<"select * from \"t/a\"">>},
{<<"actions">>, [[{<<"name">>,<<"inspect">>},
{<<"params">>,[{<<"arg1">>,1}]}]]},
{<<"description">>, <<"debug rule">>}]),
%ct:pal("RCreated : ~p", [Rule]), %ct:pal("RCreated : ~p", [Rule]),
{ok, #{code := 0, data := Rules}} = emqx_rule_engine_api:list_rules(#{}, []), {ok, #{code := 0, data := Rules}} = emqx_rule_engine_api:list_rules(#{}, []),

View File

@ -632,6 +632,17 @@ prop_zip_fun() ->
?FORALL(S, binary(), ?FORALL(S, binary(),
S == apply_func(unzip, [apply_func(zip, [S])])). S == apply_func(unzip, [apply_func(zip, [S])])).
%%------------------------------------------------------------------------------
%% Test cases for zip funcs
%%------------------------------------------------------------------------------
t_zip_compress_funcs(_) ->
?PROPTEST(prop_zip_compress_fun).
prop_zip_compress_fun() ->
?FORALL(S, binary(),
S == apply_func(zip_uncompress, [apply_func(zip_compress, [S])])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Test cases for base64 %% Test cases for base64
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE). -ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.3-rc.2"}). -define(EMQX_RELEASE, {opensource, "4.4.3-rc.3"}).
-else. -else.

View File

@ -2434,7 +2434,7 @@ end}.
{translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) -> {translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) ->
Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf), Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf),
Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) -> Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) ->
{true, {Group, list_to_binary(Strategy)}}; {true, {list_to_binary(Group), Strategy}};
(_) -> (_) ->
false false
end, Conf0), end, Conf0),