diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index dbd9c221c..02163f95b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -231,6 +231,16 @@ date_to_unix_ts/4 ]). +%% MongoDB specific date functions. These functions return a date tuple. The +%% MongoDB bridge converts such date tuples to a MongoDB date type. The +%% following functions are therefore only useful for rules with at least one +%% MongoDB action. +-export([ + mongo_date/0, + mongo_date/1, + mongo_date/2 +]). + %% Proc Dict Func -export([ proc_dict_get/1, @@ -1139,3 +1149,21 @@ function_literal(Fun, [FArg | Args]) when is_atom(Fun), is_list(Args) -> ) ++ ")"; function_literal(Fun, Args) -> {invalid_func, {Fun, Args}}. + +mongo_date() -> + erlang:timestamp(). + +mongo_date(MillisecondsTimestamp) -> + convert_timestamp(MillisecondsTimestamp). + +mongo_date(Timestamp, Unit) -> + InsertedTimeUnit = time_unit(Unit), + ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond), + convert_timestamp(ScaledEpoch). + +convert_timestamp(MillisecondsTimestamp) -> + MicroTimestamp = MillisecondsTimestamp * 1000, + MegaSecs = MicroTimestamp div 1000_000_000_000, + Secs = MicroTimestamp div 1000_000 - MegaSecs * 1000_000, + MicroSecs = MicroTimestamp rem 1000_000, + {MegaSecs, Secs, MicroSecs}. diff --git a/changes/ee/feat-10408.en.md b/changes/ee/feat-10408.en.md new file mode 100644 index 000000000..75cc4b945 --- /dev/null +++ b/changes/ee/feat-10408.en.md @@ -0,0 +1 @@ +The rule engine SQL-like language has got three more built-in functions for creating values of the MongoDB date type. These functions are useful for rules with MongoDB bridge actions only and not supported in other actions. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl index fc4270fd8..105f1fe75 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -27,7 +27,8 @@ group_tests() -> t_setup_via_config_and_publish, t_setup_via_http_api_and_publish, t_payload_template, - t_collection_template + t_collection_template, + t_mongo_date_rule_engine_functions ]. groups() -> @@ -140,10 +141,11 @@ start_apps() -> %% we want to make sure they are loaded before %% ekka start in emqx_common_test_helpers:start_apps/1 emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]). + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine, emqx_bridge]). ensure_loaded() -> _ = application:load(emqx_ee_bridge), + _ = application:load(emqtt), _ = emqx_ee_bridge:module_info(), ok. @@ -289,6 +291,27 @@ find_all(Config) -> ResourceID = emqx_bridge_resource:resource_id(Type, Name), emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}). +find_all_wait_until_non_empty(Config) -> + wait_until( + fun() -> + case find_all(Config) of + {ok, []} -> false; + _ -> true + end + end, + 5_000 + ), + find_all(Config). + +wait_until(Fun, Timeout) when Timeout >= 0 -> + case Fun() of + true -> + ok; + false -> + timer:sleep(100), + wait_until(Fun, Timeout - 100) + end. + send_message(Config, Payload) -> Name = ?config(mongo_name, Config), Type = mongo_type_bin(?config(mongo_type, Config)), @@ -383,3 +406,51 @@ t_collection_template(Config) -> find_all(Config) ), ok. + +t_mongo_date_rule_engine_functions(Config) -> + {ok, _} = + create_bridge( + Config, + #{ + <<"payload_template">> => + <<"{\"date_0\": ${date_0}, \"date_1\": ${date_1}, \"date_2\": ${date_2}}">> + } + ), + Type = mongo_type_bin(?config(mongo_type, Config)), + Name = ?config(mongo_name, Config), + SQL = + "SELECT mongo_date() as date_0, mongo_date(1000) as date_1, mongo_date(1, 'second') as date_2 FROM " + "\"t_mongo_date_rule_engine_functions/topic\"", + %% Remove rule if it already exists + RuleId = <<"rule:t_mongo_date_rule_engine_functions">>, + emqx_rule_engine:delete_rule(RuleId), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + {ok, Rule} = emqx_rule_engine:create_rule( + #{ + id => <<"rule:t_mongo_date_rule_engine_functions">>, + sql => SQL, + actions => [ + BridgeId, + #{function => console} + ], + description => <<"to mongo bridge">> + } + ), + %% Send a message to topic + {ok, Client} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client), + emqtt:publish(Client, <<"t_mongo_date_rule_engine_functions/topic">>, #{}, <<"{\"x\":1}">>, [ + {qos, 2} + ]), + emqtt:stop(Client), + ?assertMatch( + {ok, [ + #{ + <<"date_0">> := {_, _, _}, + <<"date_1">> := {0, 1, 0}, + <<"date_2">> := {0, 1, 0} + } + ]}, + find_all_wait_until_non_empty(Config) + ), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 4e7adcd6e..59f763904 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -83,5 +83,89 @@ render_message(undefined = _PayloadTemplate, Message) -> render_message(PayloadTemplate, Message) -> %% Note: mongo expects a map as a document, so the rendered result %% must be JSON-serializable - Rendered = emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Message), - emqx_utils_json:decode(Rendered, [return_maps]). + format_data(PayloadTemplate, Message). + +%% The following function was originally copied over from +%% https://github.com/emqx/emqx-enterprise/commit/50e3628129720f13f544053600ca1502731e29e0. +%% The rule engine has support for producing fields that are date tuples +%% (produced by the SQL language's built-in functions mongo_date/0, +%% mongo_date/1 and mongo_date/2) which the MongoDB driver recognizes and +%% converts to the MongoDB ISODate type +%% (https://www.compose.com/articles/understanding-dates-in-compose-mongodb/). +%% For this to work we have to replace the tuple values with references, make +%% an instance of the template, convert the instance to map with the help of +%% emqx_utils_json:decode and then finally replace the references with the +%% corresponding tuples in the resulting map. +format_data(PayloadTks, Msg) -> + % Check the Message for any tuples that need to be extracted before running the template though a json parser + PreparedTupleMap = create_mapping_of_references_to_tuple_values(Msg), + case maps:size(PreparedTupleMap) of + % If no tuples were found simply proceed with the json decoding and be done with it + 0 -> + emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]); + _ -> + % If tuples were found, replace the tuple values with the references created, run + % the modified message through the json parser, and then at the end replace the + % references with the actual tuple values. + ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap), + DecodedMap = emqx_utils_json:decode( + emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps] + ), + populate_map_with_tuple_values(PreparedTupleMap, DecodedMap) + end. + +replace_message_values_with_references(RawMessage, TupleMap) -> + % Iterate over every created reference/value pair and inject the reference into the message + maps:fold( + fun(Reference, OriginalValue, OriginalMessage) -> + % Iterate over the Message, which is a map, and look for the element which + % matches the Value in the map which holds the references/original values and replace + % with the reference + maps:fold( + fun(Key, Value, NewMap) -> + case Value == OriginalValue of + true -> + %% Wrap the reference in a string to make it JSON-serializable + StringRef = io_lib:format("\"~s\"", [Reference]), + WrappedRef = erlang:iolist_to_binary(StringRef), + maps:put(Key, WrappedRef, NewMap); + false -> + maps:put(Key, Value, NewMap) + end + end, + #{}, + OriginalMessage + ) + end, + RawMessage, + TupleMap + ). + +create_mapping_of_references_to_tuple_values(Message) -> + maps:fold( + fun + (_Key, Value, TupleMap) when is_tuple(Value) -> + Ref0 = emqx_guid:to_hexstr(emqx_guid:gen()), + Ref = <<"MONGO_DATE_REF_", Ref0/binary>>, + maps:put(Ref, Value, TupleMap); + (_Key, _Value, TupleMap) -> + TupleMap + end, + #{}, + Message + ). + +populate_map_with_tuple_values(TupleMap, MapToMap) -> + MappingFun = + fun + (_Key, Value) when is_map(Value) -> + populate_map_with_tuple_values(TupleMap, Value); + (_Key, Value) -> + case maps:is_key(Value, TupleMap) of + true -> + maps:get(Value, TupleMap); + false -> + Value + end + end, + maps:map(MappingFun, MapToMap).