Merge remote-tracking branch 'ce/main-v4.3' into merge-main-v4.3-into-v4.4
This commit is contained in:
commit
c8c52dfc39
|
@ -31,6 +31,7 @@ File format:
|
||||||
node. It will improves the efficiency of shared messages dispatching in certain
|
node. It will improves the efficiency of shared messages dispatching in certain
|
||||||
scenarios, especially when the emqx-bridge-mqtt plugin is configured as shared
|
scenarios, especially when the emqx-bridge-mqtt plugin is configured as shared
|
||||||
subscription. [#7462]
|
subscription. [#7462]
|
||||||
|
* Add some compression functions to rule-engine: gzip, gunzip, zip, unzip, zip_compress, zip_uncompress
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
|
|
|
@ -222,19 +222,29 @@ test_rule_sql(Params) ->
|
||||||
do_create_rule(Params) ->
|
do_create_rule(Params) ->
|
||||||
case parse_rule_params(Params) of
|
case parse_rule_params(Params) of
|
||||||
{ok, ParsedParams} ->
|
{ok, ParsedParams} ->
|
||||||
case emqx_rule_engine:create_rule(ParsedParams) of
|
case maps:find(id, ParsedParams) of
|
||||||
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
{ok, RuleId} ->
|
||||||
{error, {action_not_found, ActionName}} ->
|
case emqx_rule_registry:get_rule(RuleId) of
|
||||||
return({error, 400, ?ERR_NO_ACTION(ActionName)});
|
{ok, _} -> return({error, 400, <<"Already Exists">>});
|
||||||
{error, Reason} ->
|
not_found -> do_create_rule2(ParsedParams)
|
||||||
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
end;
|
||||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
error -> do_create_rule2(ParsedParams)
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
||||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_create_rule2(ParsedParams) ->
|
||||||
|
case emqx_rule_engine:create_rule(ParsedParams) of
|
||||||
|
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
||||||
|
{error, {action_not_found, ActionName}} ->
|
||||||
|
return({error, 400, ?ERR_NO_ACTION(ActionName)});
|
||||||
|
{error, Reason} ->
|
||||||
|
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
||||||
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||||
|
end.
|
||||||
|
|
||||||
update_rule(#{id := Id}, Params) ->
|
update_rule(#{id := Id}, Params) ->
|
||||||
case parse_rule_params(Params, #{id => Id}) of
|
case parse_rule_params(Params, #{id => Id}) of
|
||||||
{ok, ParsedParams} ->
|
{ok, ParsedParams} ->
|
||||||
|
|
|
@ -182,6 +182,11 @@
|
||||||
, unzip/1
|
, unzip/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% compressed Funcs
|
||||||
|
-export([ zip_compress/1
|
||||||
|
, zip_uncompress/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% Data encode and decode
|
%% Data encode and decode
|
||||||
-export([ base64_encode/1
|
-export([ base64_encode/1
|
||||||
, base64_decode/1
|
, base64_decode/1
|
||||||
|
@ -823,6 +828,16 @@ zip(S) when is_binary(S) ->
|
||||||
unzip(S) when is_binary(S) ->
|
unzip(S) when is_binary(S) ->
|
||||||
zlib:unzip(S).
|
zlib:unzip(S).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% zip_compress Funcs
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
zip_compress(S) when is_binary(S) ->
|
||||||
|
zlib:compress(S).
|
||||||
|
|
||||||
|
zip_uncompress(S) when is_binary(S) ->
|
||||||
|
zlib:uncompress(S).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Data encode and decode Funcs
|
%% Data encode and decode Funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -430,6 +430,14 @@ t_crud_rule_api(_Config) ->
|
||||||
{<<"params">>,[{<<"arg1">>,1}]}]]},
|
{<<"params">>,[{<<"arg1">>,1}]}]]},
|
||||||
{<<"description">>, <<"debug rule">>}]),
|
{<<"description">>, <<"debug rule">>}]),
|
||||||
RuleID = maps:get(id, Rule),
|
RuleID = maps:get(id, Rule),
|
||||||
|
{ok, #{code := 400, message := <<"Already Exists">>}} =
|
||||||
|
emqx_rule_engine_api:create_rule(#{},
|
||||||
|
[{<<"name">>, <<"debug-rule">>},
|
||||||
|
{<<"id">>, RuleID},
|
||||||
|
{<<"rawsql">>, <<"select * from \"t/a\"">>},
|
||||||
|
{<<"actions">>, [[{<<"name">>,<<"inspect">>},
|
||||||
|
{<<"params">>,[{<<"arg1">>,1}]}]]},
|
||||||
|
{<<"description">>, <<"debug rule">>}]),
|
||||||
%ct:pal("RCreated : ~p", [Rule]),
|
%ct:pal("RCreated : ~p", [Rule]),
|
||||||
|
|
||||||
{ok, #{code := 0, data := Rules}} = emqx_rule_engine_api:list_rules(#{}, []),
|
{ok, #{code := 0, data := Rules}} = emqx_rule_engine_api:list_rules(#{}, []),
|
||||||
|
|
|
@ -632,6 +632,17 @@ prop_zip_fun() ->
|
||||||
?FORALL(S, binary(),
|
?FORALL(S, binary(),
|
||||||
S == apply_func(unzip, [apply_func(zip, [S])])).
|
S == apply_func(unzip, [apply_func(zip, [S])])).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Test cases for zip funcs
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_zip_compress_funcs(_) ->
|
||||||
|
?PROPTEST(prop_zip_compress_fun).
|
||||||
|
|
||||||
|
prop_zip_compress_fun() ->
|
||||||
|
?FORALL(S, binary(),
|
||||||
|
S == apply_func(zip_uncompress, [apply_func(zip_compress, [S])])).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Test cases for base64
|
%% Test cases for base64
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-ifndef(EMQX_ENTERPRISE).
|
-ifndef(EMQX_ENTERPRISE).
|
||||||
|
|
||||||
-define(EMQX_RELEASE, {opensource, "4.4.3-rc.2"}).
|
-define(EMQX_RELEASE, {opensource, "4.4.3-rc.3"}).
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
|
|
|
@ -2434,7 +2434,7 @@ end}.
|
||||||
{translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) ->
|
{translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) ->
|
||||||
Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf),
|
Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf),
|
||||||
Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) ->
|
Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) ->
|
||||||
{true, {Group, list_to_binary(Strategy)}};
|
{true, {list_to_binary(Group), Strategy}};
|
||||||
(_) ->
|
(_) ->
|
||||||
false
|
false
|
||||||
end, Conf0),
|
end, Conf0),
|
||||||
|
|
Loading…
Reference in New Issue