Merge pull request #10408 from kjellwinblad/kjell/rule_engine/add_missing_mongo_date_functions/EMQX-9244

feat: add mongo_date functions to the rule engine
This commit is contained in:
Kjell Winblad 2023-04-20 09:46:45 +02:00 committed by GitHub
commit 7d3367467a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 188 additions and 4 deletions

View File

@ -231,6 +231,16 @@
date_to_unix_ts/4
]).
%% MongoDB specific date functions. These functions return a date tuple. The
%% MongoDB bridge converts such date tuples to a MongoDB date type. The
%% following functions are therefore only useful for rules with at least one
%% MongoDB action.
-export([
mongo_date/0,
mongo_date/1,
mongo_date/2
]).
%% Proc Dict Func
-export([
proc_dict_get/1,
@ -1139,3 +1149,21 @@ function_literal(Fun, [FArg | Args]) when is_atom(Fun), is_list(Args) ->
) ++ ")";
function_literal(Fun, Args) ->
{invalid_func, {Fun, Args}}.
mongo_date() ->
erlang:timestamp().
mongo_date(MillisecondsTimestamp) ->
convert_timestamp(MillisecondsTimestamp).
mongo_date(Timestamp, Unit) ->
InsertedTimeUnit = time_unit(Unit),
ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond),
convert_timestamp(ScaledEpoch).
convert_timestamp(MillisecondsTimestamp) ->
MicroTimestamp = MillisecondsTimestamp * 1000,
MegaSecs = MicroTimestamp div 1000_000_000_000,
Secs = MicroTimestamp div 1000_000 - MegaSecs * 1000_000,
MicroSecs = MicroTimestamp rem 1000_000,
{MegaSecs, Secs, MicroSecs}.

View File

@ -0,0 +1 @@
The rule engine SQL-like language has got three more built-in functions for creating values of the MongoDB date type. These functions are useful for rules with MongoDB bridge actions only and not supported in other actions.

View File

@ -27,7 +27,8 @@ group_tests() ->
t_setup_via_config_and_publish,
t_setup_via_http_api_and_publish,
t_payload_template,
t_collection_template
t_collection_template,
t_mongo_date_rule_engine_functions
].
groups() ->
@ -140,10 +141,11 @@ start_apps() ->
%% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]).
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine, emqx_bridge]).
ensure_loaded() ->
_ = application:load(emqx_ee_bridge),
_ = application:load(emqtt),
_ = emqx_ee_bridge:module_info(),
ok.
@ -289,6 +291,27 @@ find_all(Config) ->
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).
find_all_wait_until_non_empty(Config) ->
wait_until(
fun() ->
case find_all(Config) of
{ok, []} -> false;
_ -> true
end
end,
5_000
),
find_all(Config).
wait_until(Fun, Timeout) when Timeout >= 0 ->
case Fun() of
true ->
ok;
false ->
timer:sleep(100),
wait_until(Fun, Timeout - 100)
end.
send_message(Config, Payload) ->
Name = ?config(mongo_name, Config),
Type = mongo_type_bin(?config(mongo_type, Config)),
@ -383,3 +406,51 @@ t_collection_template(Config) ->
find_all(Config)
),
ok.
t_mongo_date_rule_engine_functions(Config) ->
{ok, _} =
create_bridge(
Config,
#{
<<"payload_template">> =>
<<"{\"date_0\": ${date_0}, \"date_1\": ${date_1}, \"date_2\": ${date_2}}">>
}
),
Type = mongo_type_bin(?config(mongo_type, Config)),
Name = ?config(mongo_name, Config),
SQL =
"SELECT mongo_date() as date_0, mongo_date(1000) as date_1, mongo_date(1, 'second') as date_2 FROM "
"\"t_mongo_date_rule_engine_functions/topic\"",
%% Remove rule if it already exists
RuleId = <<"rule:t_mongo_date_rule_engine_functions">>,
emqx_rule_engine:delete_rule(RuleId),
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
{ok, Rule} = emqx_rule_engine:create_rule(
#{
id => <<"rule:t_mongo_date_rule_engine_functions">>,
sql => SQL,
actions => [
BridgeId,
#{function => console}
],
description => <<"to mongo bridge">>
}
),
%% Send a message to topic
{ok, Client} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client),
emqtt:publish(Client, <<"t_mongo_date_rule_engine_functions/topic">>, #{}, <<"{\"x\":1}">>, [
{qos, 2}
]),
emqtt:stop(Client),
?assertMatch(
{ok, [
#{
<<"date_0">> := {_, _, _},
<<"date_1">> := {0, 1, 0},
<<"date_2">> := {0, 1, 0}
}
]},
find_all_wait_until_non_empty(Config)
),
ok.

View File

@ -83,5 +83,89 @@ render_message(undefined = _PayloadTemplate, Message) ->
render_message(PayloadTemplate, Message) ->
%% Note: mongo expects a map as a document, so the rendered result
%% must be JSON-serializable
Rendered = emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Message),
emqx_utils_json:decode(Rendered, [return_maps]).
format_data(PayloadTemplate, Message).
%% The following function was originally copied over from
%% https://github.com/emqx/emqx-enterprise/commit/50e3628129720f13f544053600ca1502731e29e0.
%% The rule engine has support for producing fields that are date tuples
%% (produced by the SQL language's built-in functions mongo_date/0,
%% mongo_date/1 and mongo_date/2) which the MongoDB driver recognizes and
%% converts to the MongoDB ISODate type
%% (https://www.compose.com/articles/understanding-dates-in-compose-mongodb/).
%% For this to work we have to replace the tuple values with references, make
%% an instance of the template, convert the instance to map with the help of
%% emqx_utils_json:decode and then finally replace the references with the
%% corresponding tuples in the resulting map.
format_data(PayloadTks, Msg) ->
% Check the Message for any tuples that need to be extracted before running the template though a json parser
PreparedTupleMap = create_mapping_of_references_to_tuple_values(Msg),
case maps:size(PreparedTupleMap) of
% If no tuples were found simply proceed with the json decoding and be done with it
0 ->
emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]);
_ ->
% If tuples were found, replace the tuple values with the references created, run
% the modified message through the json parser, and then at the end replace the
% references with the actual tuple values.
ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap),
DecodedMap = emqx_utils_json:decode(
emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps]
),
populate_map_with_tuple_values(PreparedTupleMap, DecodedMap)
end.
replace_message_values_with_references(RawMessage, TupleMap) ->
% Iterate over every created reference/value pair and inject the reference into the message
maps:fold(
fun(Reference, OriginalValue, OriginalMessage) ->
% Iterate over the Message, which is a map, and look for the element which
% matches the Value in the map which holds the references/original values and replace
% with the reference
maps:fold(
fun(Key, Value, NewMap) ->
case Value == OriginalValue of
true ->
%% Wrap the reference in a string to make it JSON-serializable
StringRef = io_lib:format("\"~s\"", [Reference]),
WrappedRef = erlang:iolist_to_binary(StringRef),
maps:put(Key, WrappedRef, NewMap);
false ->
maps:put(Key, Value, NewMap)
end
end,
#{},
OriginalMessage
)
end,
RawMessage,
TupleMap
).
create_mapping_of_references_to_tuple_values(Message) ->
maps:fold(
fun
(_Key, Value, TupleMap) when is_tuple(Value) ->
Ref0 = emqx_guid:to_hexstr(emqx_guid:gen()),
Ref = <<"MONGO_DATE_REF_", Ref0/binary>>,
maps:put(Ref, Value, TupleMap);
(_Key, _Value, TupleMap) ->
TupleMap
end,
#{},
Message
).
populate_map_with_tuple_values(TupleMap, MapToMap) ->
MappingFun =
fun
(_Key, Value) when is_map(Value) ->
populate_map_with_tuple_values(TupleMap, Value);
(_Key, Value) ->
case maps:is_key(Value, TupleMap) of
true ->
maps:get(Value, TupleMap);
false ->
Value
end
end,
maps:map(MappingFun, MapToMap).