From 29584ca7215937578d969ce7dc13742df19f193d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 14 Apr 2023 18:23:15 +0200 Subject: [PATCH 1/3] feat: add mongo_date functions to the rule engine This commit adds mong_date built-in functions to the rule engine SQL-like language. Corresponding functions already existed in EMQX 4.4 and this commit makes sure that EMQX 5.X also has these functions. Fixes: https://emqx.atlassian.net/browse/EMQX-9244 --- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 28 ++++++ changes/ee/feat-10408.en.md | 1 + .../test/emqx_ee_bridge_mongodb_SUITE.erl | 75 +++++++++++++++- .../src/emqx_ee_connector_mongodb.erl | 88 ++++++++++++++++++- 4 files changed, 188 insertions(+), 4 deletions(-) create mode 100644 changes/ee/feat-10408.en.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 81435d9ac..dfb79a40c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -230,6 +230,16 @@ date_to_unix_ts/4 ]). +%% MongoDB specific date functions. These functions return a date tuple. The +%% MongoDB bridge converts such date tuples to a MongoDB date type. The +%% following functions are therefore only useful for rules with at least one +%% MongoDB action. +-export([ + mongo_date/0, + mongo_date/1, + mongo_date/2 +]). + %% Proc Dict Func -export([ proc_dict_get/1, @@ -1135,3 +1145,21 @@ function_literal(Fun, [FArg | Args]) when is_atom(Fun), is_list(Args) -> ) ++ ")"; function_literal(Fun, Args) -> {invalid_func, {Fun, Args}}. + +mongo_date() -> + erlang:timestamp(). + +mongo_date(MillisecondsTimestamp) -> + convert_timestamp(MillisecondsTimestamp). + +mongo_date(Timestamp, Unit) -> + InsertedTimeUnit = time_unit(Unit), + ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond), + convert_timestamp(ScaledEpoch). + +convert_timestamp(MillisecondsTimestamp) -> + MicroTimestamp = MillisecondsTimestamp * 1000, + MegaSecs = MicroTimestamp div 1000_000_000_000, + Secs = MicroTimestamp div 1000_000 - MegaSecs * 1000_000, + MicroSecs = MicroTimestamp rem 1000_000, + {MegaSecs, Secs, MicroSecs}. diff --git a/changes/ee/feat-10408.en.md b/changes/ee/feat-10408.en.md new file mode 100644 index 000000000..bc18b8a80 --- /dev/null +++ b/changes/ee/feat-10408.en.md @@ -0,0 +1 @@ +The rule engine SQL-like language has got three more built-in functions for creating values of the MongoDB date type. These functions are only useful for rules with at least one MongoDB bridge action. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl index 0959e3c78..1c3c4a2c3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -27,7 +27,8 @@ group_tests() -> t_setup_via_config_and_publish, t_setup_via_http_api_and_publish, t_payload_template, - t_collection_template + t_collection_template, + t_mongo_date_rule_engine_functions ]. groups() -> @@ -140,10 +141,11 @@ start_apps() -> %% we want to make sure they are loaded before %% ekka start in emqx_common_test_helpers:start_apps/1 emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]). + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine, emqx_bridge]). ensure_loaded() -> _ = application:load(emqx_ee_bridge), + _ = application:load(emqtt), _ = emqx_ee_bridge:module_info(), ok. @@ -282,6 +284,27 @@ find_all(Config) -> ResourceID = emqx_bridge_resource:resource_id(Type, Name), emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}). +find_all_wait_until_non_empty(Config) -> + wait_until( + fun() -> + case find_all(Config) of + {ok, []} -> false; + _ -> true + end + end, + 5_000 + ), + find_all(Config). + +wait_until(Fun, Timeout) when Timeout >= 0 -> + case Fun() of + true -> + ok; + false -> + timer:sleep(100), + wait_until(Fun, Timeout - 100) + end. + send_message(Config, Payload) -> Name = ?config(mongo_name, Config), Type = mongo_type_bin(?config(mongo_type, Config)), @@ -376,3 +399,51 @@ t_collection_template(Config) -> find_all(Config) ), ok. + +t_mongo_date_rule_engine_functions(Config) -> + {ok, _} = + create_bridge( + Config, + #{ + <<"payload_template">> => + <<"{\"date_0\": ${date_0}, \"date_1\": ${date_1}, \"date_2\": ${date_2}}">> + } + ), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + SQL = + "SELECT mongo_date() as date_0, mongo_date(1000) as date_1, mongo_date(1, 'second') as date_2 FROM " + "\"t_mongo_date_rule_engine_functions/topic\"", + %% Remove rule if it already exists + RuleId = <<"rule:t_mongo_date_rule_engine_functions">>, + emqx_rule_engine:delete_rule(RuleId), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + {ok, Rule} = emqx_rule_engine:create_rule( + #{ + id => <<"rule:t_mongo_date_rule_engine_functions">>, + sql => SQL, + actions => [ + BridgeId, + #{function => console} + ], + description => <<"to mongo bridge">> + } + ), + %% Send a message to topic + {ok, Client} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client), + emqtt:publish(Client, <<"t_mongo_date_rule_engine_functions/topic">>, #{}, <<"{\"x\":1}">>, [ + {qos, 2} + ]), + emqtt:stop(Client), + ?assertMatch( + {ok, [ + #{ + <<"date_0">> := {_, _, _}, + <<"date_1">> := {0, 1, 0}, + <<"date_2">> := {0, 1, 0} + } + ]}, + find_all_wait_until_non_empty(Config) + ), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 4e7adcd6e..90c422643 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -83,5 +83,89 @@ render_message(undefined = _PayloadTemplate, Message) -> render_message(PayloadTemplate, Message) -> %% Note: mongo expects a map as a document, so the rendered result %% must be JSON-serializable - Rendered = emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Message), - emqx_utils_json:decode(Rendered, [return_maps]). + format_data(PayloadTemplate, Message). + +%% The following function was originally copied over from +%% https://github.com/emqx/emqx-enterprise/commit/50e3628129720f13f544053600ca1502731e29e0. +%% The rule engine has support for producing fields that are date tuples +%% (produced by the SQL language's built-in functions mongo_date/0, +%% mongo_date/1 and mongo_date/2) which the MongoDB driver recognizes and +%% converts to the MongoDB ISODate type +%% (https://www.compose.com/articles/understanding-dates-in-compose-mongodb/). +%% For this to work we have to replace the tuple values with references, make +%% an instance of the template, convert the instance to map with the help of +%% emqx_utils_json:decode and then finally replace the references with the +%% corresponding tuples in the resulting map. +format_data(PayloadTks, Msg) -> + % Check the Message for any tuples that need to be extracted before running the template though a json parser + PreparedTupleMap = create_mapping_of_references_to_tuple_values(Msg), + case maps:size(PreparedTupleMap) of + % If no tuples were found simply proceed with the json decoding and be done with it + 0 -> + emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]); + _ -> + % If tuples were found, replace the tuple values with the references created, run + % the modified message through the json parser, and then at the end replace the + % references with the actual tuple values. + ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap), + DecodedMap = emqx_utils_json:decode( + emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps] + ), + populate_map_with_tuple_values(PreparedTupleMap, DecodedMap) + end. + +replace_message_values_with_references(RawMessage, TupleMap) -> + % Iterate over every created reference/value pair and inject the reference into the message + maps:fold( + fun(Reference, OriginalValue, OriginalMessage) -> + % Iterate over the Message, which is a map, and look for the element which + % matches the Value in the map which holds the references/original values and replace + % with the reference + maps:fold( + fun(Key, Value, NewMap) -> + case Value == OriginalValue of + true -> + %% Wrap the reference in a string to make it JSON-serializable + StringRef = io_lib:format("\"~s\"", [Reference]), + WrappedRef = erlang:iolist_to_binary(StringRef), + maps:put(Key, WrappedRef, NewMap); + false -> + maps:put(Key, Value, NewMap) + end + end, + #{}, + OriginalMessage + ) + end, + RawMessage, + TupleMap + ). + +create_mapping_of_references_to_tuple_values(Message) -> + maps:fold( + fun + (_Key, Value, TupleMap) when is_tuple(Value) -> + Ref0 = emqx_guid:to_hexstr(emqx_guid:gen()), + Ref = <<"MONGO_DATE_REF_", Ref0/binary>>, + maps:put(Ref, Value, TupleMap); + (_key, _value, TupleMap) -> + TupleMap + end, + #{}, + Message + ). + +populate_map_with_tuple_values(TupleMap, MapToMap) -> + MappingFun = + fun + (_Key, Value) when is_map(Value) -> + populate_map_with_tuple_values(TupleMap, Value); + (_Key, Value) -> + case maps:is_key(Value, TupleMap) of + true -> + maps:get(Value, TupleMap); + false -> + Value + end + end, + maps:map(MappingFun, MapToMap). From 88360113106fded2af1d11899d4cb313616fbe5d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 17 Apr 2023 18:08:03 +0200 Subject: [PATCH 2/3] docs: better changelog entry for mongo_date functions Co-authored-by: Thales Macedo Garitezi --- changes/ee/feat-10408.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ee/feat-10408.en.md b/changes/ee/feat-10408.en.md index bc18b8a80..75cc4b945 100644 --- a/changes/ee/feat-10408.en.md +++ b/changes/ee/feat-10408.en.md @@ -1 +1 @@ -The rule engine SQL-like language has got three more built-in functions for creating values of the MongoDB date type. These functions are only useful for rules with at least one MongoDB bridge action. +The rule engine SQL-like language has got three more built-in functions for creating values of the MongoDB date type. These functions are useful for rules with MongoDB bridge actions only and not supported in other actions. From 8a3fccb3309b64e6ecf562580281d7262e480ce0 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 18 Apr 2023 10:32:37 +0200 Subject: [PATCH 3/3] style: fix variable name style --- lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 90c422643..59f763904 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -148,7 +148,7 @@ create_mapping_of_references_to_tuple_values(Message) -> Ref0 = emqx_guid:to_hexstr(emqx_guid:gen()), Ref = <<"MONGO_DATE_REF_", Ref0/binary>>, maps:put(Ref, Value, TupleMap); - (_key, _value, TupleMap) -> + (_Key, _Value, TupleMap) -> TupleMap end, #{},