diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 8b6541a25..3fffadc63 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -31,6 +31,7 @@ File format: node. It will improves the efficiency of shared messages dispatching in certain scenarios, especially when the emqx-bridge-mqtt plugin is configured as shared subscription. [#7462] +* Add some compression functions to rule-engine: gzip, gunzip, zip, unzip, zip_compress, zip_uncompress ### Bug fixes diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 6a50c553d..baa47d71d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -222,19 +222,29 @@ test_rule_sql(Params) -> do_create_rule(Params) -> case parse_rule_params(Params) of {ok, ParsedParams} -> - case emqx_rule_engine:create_rule(ParsedParams) of - {ok, Rule} -> return({ok, record_to_map(Rule)}); - {error, {action_not_found, ActionName}} -> - return({error, 400, ?ERR_NO_ACTION(ActionName)}); - {error, Reason} -> - ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) + case maps:find(id, ParsedParams) of + {ok, RuleId} -> + case emqx_rule_registry:get_rule(RuleId) of + {ok, _} -> return({error, 400, <<"Already Exists">>}); + not_found -> do_create_rule2(ParsedParams) + end; + error -> do_create_rule2(ParsedParams) end; {error, Reason} -> ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. +do_create_rule2(ParsedParams) -> + case emqx_rule_engine:create_rule(ParsedParams) of + {ok, Rule} -> return({ok, record_to_map(Rule)}); + {error, {action_not_found, ActionName}} -> + return({error, 400, ?ERR_NO_ACTION(ActionName)}); + {error, Reason} -> + ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), + return({error, 400, ?ERR_BADARGS(Reason)}) + end. + update_rule(#{id := Id}, Params) -> case parse_rule_params(Params, #{id => Id}) of {ok, ParsedParams} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 7c972323a..fd8206327 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -182,6 +182,11 @@ , unzip/1 ]). +%% compressed Funcs +-export([ zip_compress/1 + , zip_uncompress/1 + ]). + %% Data encode and decode -export([ base64_encode/1 , base64_decode/1 @@ -823,6 +828,16 @@ zip(S) when is_binary(S) -> unzip(S) when is_binary(S) -> zlib:unzip(S). +%%------------------------------------------------------------------------------ +%% zip_compress Funcs +%%------------------------------------------------------------------------------ + +zip_compress(S) when is_binary(S) -> + zlib:compress(S). + +zip_uncompress(S) when is_binary(S) -> + zlib:uncompress(S). + %%------------------------------------------------------------------------------ %% Data encode and decode Funcs %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 78bfbcad8..d71c05746 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -430,6 +430,14 @@ t_crud_rule_api(_Config) -> {<<"params">>,[{<<"arg1">>,1}]}]]}, {<<"description">>, <<"debug rule">>}]), RuleID = maps:get(id, Rule), + {ok, #{code := 400, message := <<"Already Exists">>}} = + emqx_rule_engine_api:create_rule(#{}, + [{<<"name">>, <<"debug-rule">>}, + {<<"id">>, RuleID}, + {<<"rawsql">>, <<"select * from \"t/a\"">>}, + {<<"actions">>, [[{<<"name">>,<<"inspect">>}, + {<<"params">>,[{<<"arg1">>,1}]}]]}, + {<<"description">>, <<"debug rule">>}]), %ct:pal("RCreated : ~p", [Rule]), {ok, #{code := 0, data := Rules}} = emqx_rule_engine_api:list_rules(#{}, []), diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 350a57051..0f82b691d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -632,6 +632,17 @@ prop_zip_fun() -> ?FORALL(S, binary(), S == apply_func(unzip, [apply_func(zip, [S])])). +%%------------------------------------------------------------------------------ +%% Test cases for zip funcs +%%------------------------------------------------------------------------------ + +t_zip_compress_funcs(_) -> + ?PROPTEST(prop_zip_compress_fun). + +prop_zip_compress_fun() -> + ?FORALL(S, binary(), + S == apply_func(zip_uncompress, [apply_func(zip_compress, [S])])). + %%------------------------------------------------------------------------------ %% Test cases for base64 %%------------------------------------------------------------------------------ diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index b3b06c23f..f0a4f6d78 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.4.3-rc.2"}). +-define(EMQX_RELEASE, {opensource, "4.4.3-rc.3"}). -else. diff --git a/priv/emqx.schema b/priv/emqx.schema index d057610d0..d799f170d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2434,7 +2434,7 @@ end}. {translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) -> Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf), Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) -> - {true, {Group, list_to_binary(Strategy)}}; + {true, {list_to_binary(Group), Strategy}}; (_) -> false end, Conf0),