feat: add mongo_date functions to the rule engine
This commit adds mong_date built-in functions to the rule engine SQL-like language. Corresponding functions already existed in EMQX 4.4 and this commit makes sure that EMQX 5.X also has these functions. Fixes: https://emqx.atlassian.net/browse/EMQX-9244
This commit is contained in:
parent
466a28daf2
commit
29584ca721
|
@ -230,6 +230,16 @@
|
|||
date_to_unix_ts/4
|
||||
]).
|
||||
|
||||
%% MongoDB specific date functions. These functions return a date tuple. The
|
||||
%% MongoDB bridge converts such date tuples to a MongoDB date type. The
|
||||
%% following functions are therefore only useful for rules with at least one
|
||||
%% MongoDB action.
|
||||
-export([
|
||||
mongo_date/0,
|
||||
mongo_date/1,
|
||||
mongo_date/2
|
||||
]).
|
||||
|
||||
%% Proc Dict Func
|
||||
-export([
|
||||
proc_dict_get/1,
|
||||
|
@ -1135,3 +1145,21 @@ function_literal(Fun, [FArg | Args]) when is_atom(Fun), is_list(Args) ->
|
|||
) ++ ")";
|
||||
function_literal(Fun, Args) ->
|
||||
{invalid_func, {Fun, Args}}.
|
||||
|
||||
mongo_date() ->
|
||||
erlang:timestamp().
|
||||
|
||||
mongo_date(MillisecondsTimestamp) ->
|
||||
convert_timestamp(MillisecondsTimestamp).
|
||||
|
||||
mongo_date(Timestamp, Unit) ->
|
||||
InsertedTimeUnit = time_unit(Unit),
|
||||
ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond),
|
||||
convert_timestamp(ScaledEpoch).
|
||||
|
||||
convert_timestamp(MillisecondsTimestamp) ->
|
||||
MicroTimestamp = MillisecondsTimestamp * 1000,
|
||||
MegaSecs = MicroTimestamp div 1000_000_000_000,
|
||||
Secs = MicroTimestamp div 1000_000 - MegaSecs * 1000_000,
|
||||
MicroSecs = MicroTimestamp rem 1000_000,
|
||||
{MegaSecs, Secs, MicroSecs}.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
The rule engine SQL-like language has got three more built-in functions for creating values of the MongoDB date type. These functions are only useful for rules with at least one MongoDB bridge action.
|
|
@ -27,7 +27,8 @@ group_tests() ->
|
|||
t_setup_via_config_and_publish,
|
||||
t_setup_via_http_api_and_publish,
|
||||
t_payload_template,
|
||||
t_collection_template
|
||||
t_collection_template,
|
||||
t_mongo_date_rule_engine_functions
|
||||
].
|
||||
|
||||
groups() ->
|
||||
|
@ -140,10 +141,11 @@ start_apps() ->
|
|||
%% we want to make sure they are loaded before
|
||||
%% ekka start in emqx_common_test_helpers:start_apps/1
|
||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]).
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine, emqx_bridge]).
|
||||
|
||||
ensure_loaded() ->
|
||||
_ = application:load(emqx_ee_bridge),
|
||||
_ = application:load(emqtt),
|
||||
_ = emqx_ee_bridge:module_info(),
|
||||
ok.
|
||||
|
||||
|
@ -282,6 +284,27 @@ find_all(Config) ->
|
|||
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
|
||||
emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).
|
||||
|
||||
find_all_wait_until_non_empty(Config) ->
|
||||
wait_until(
|
||||
fun() ->
|
||||
case find_all(Config) of
|
||||
{ok, []} -> false;
|
||||
_ -> true
|
||||
end
|
||||
end,
|
||||
5_000
|
||||
),
|
||||
find_all(Config).
|
||||
|
||||
wait_until(Fun, Timeout) when Timeout >= 0 ->
|
||||
case Fun() of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
timer:sleep(100),
|
||||
wait_until(Fun, Timeout - 100)
|
||||
end.
|
||||
|
||||
send_message(Config, Payload) ->
|
||||
Name = ?config(mongo_name, Config),
|
||||
Type = mongo_type_bin(?config(mongo_type, Config)),
|
||||
|
@ -376,3 +399,51 @@ t_collection_template(Config) ->
|
|||
find_all(Config)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_mongo_date_rule_engine_functions(Config) ->
|
||||
{ok, _} =
|
||||
create_bridge(
|
||||
Config,
|
||||
#{
|
||||
<<"payload_template">> =>
|
||||
<<"{\"date_0\": ${date_0}, \"date_1\": ${date_1}, \"date_2\": ${date_2}}">>
|
||||
}
|
||||
),
|
||||
Type = mongo_type_bin(?config(mongo_type, Config)),
|
||||
Name = ?config(mongo_name, Config),
|
||||
SQL =
|
||||
"SELECT mongo_date() as date_0, mongo_date(1000) as date_1, mongo_date(1, 'second') as date_2 FROM "
|
||||
"\"t_mongo_date_rule_engine_functions/topic\"",
|
||||
%% Remove rule if it already exists
|
||||
RuleId = <<"rule:t_mongo_date_rule_engine_functions">>,
|
||||
emqx_rule_engine:delete_rule(RuleId),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
|
||||
{ok, Rule} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
id => <<"rule:t_mongo_date_rule_engine_functions">>,
|
||||
sql => SQL,
|
||||
actions => [
|
||||
BridgeId,
|
||||
#{function => console}
|
||||
],
|
||||
description => <<"to mongo bridge">>
|
||||
}
|
||||
),
|
||||
%% Send a message to topic
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
emqtt:publish(Client, <<"t_mongo_date_rule_engine_functions/topic">>, #{}, <<"{\"x\":1}">>, [
|
||||
{qos, 2}
|
||||
]),
|
||||
emqtt:stop(Client),
|
||||
?assertMatch(
|
||||
{ok, [
|
||||
#{
|
||||
<<"date_0">> := {_, _, _},
|
||||
<<"date_1">> := {0, 1, 0},
|
||||
<<"date_2">> := {0, 1, 0}
|
||||
}
|
||||
]},
|
||||
find_all_wait_until_non_empty(Config)
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -83,5 +83,89 @@ render_message(undefined = _PayloadTemplate, Message) ->
|
|||
render_message(PayloadTemplate, Message) ->
|
||||
%% Note: mongo expects a map as a document, so the rendered result
|
||||
%% must be JSON-serializable
|
||||
Rendered = emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Message),
|
||||
emqx_utils_json:decode(Rendered, [return_maps]).
|
||||
format_data(PayloadTemplate, Message).
|
||||
|
||||
%% The following function was originally copied over from
|
||||
%% https://github.com/emqx/emqx-enterprise/commit/50e3628129720f13f544053600ca1502731e29e0.
|
||||
%% The rule engine has support for producing fields that are date tuples
|
||||
%% (produced by the SQL language's built-in functions mongo_date/0,
|
||||
%% mongo_date/1 and mongo_date/2) which the MongoDB driver recognizes and
|
||||
%% converts to the MongoDB ISODate type
|
||||
%% (https://www.compose.com/articles/understanding-dates-in-compose-mongodb/).
|
||||
%% For this to work we have to replace the tuple values with references, make
|
||||
%% an instance of the template, convert the instance to map with the help of
|
||||
%% emqx_utils_json:decode and then finally replace the references with the
|
||||
%% corresponding tuples in the resulting map.
|
||||
format_data(PayloadTks, Msg) ->
|
||||
% Check the Message for any tuples that need to be extracted before running the template though a json parser
|
||||
PreparedTupleMap = create_mapping_of_references_to_tuple_values(Msg),
|
||||
case maps:size(PreparedTupleMap) of
|
||||
% If no tuples were found simply proceed with the json decoding and be done with it
|
||||
0 ->
|
||||
emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]);
|
||||
_ ->
|
||||
% If tuples were found, replace the tuple values with the references created, run
|
||||
% the modified message through the json parser, and then at the end replace the
|
||||
% references with the actual tuple values.
|
||||
ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap),
|
||||
DecodedMap = emqx_utils_json:decode(
|
||||
emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps]
|
||||
),
|
||||
populate_map_with_tuple_values(PreparedTupleMap, DecodedMap)
|
||||
end.
|
||||
|
||||
replace_message_values_with_references(RawMessage, TupleMap) ->
|
||||
% Iterate over every created reference/value pair and inject the reference into the message
|
||||
maps:fold(
|
||||
fun(Reference, OriginalValue, OriginalMessage) ->
|
||||
% Iterate over the Message, which is a map, and look for the element which
|
||||
% matches the Value in the map which holds the references/original values and replace
|
||||
% with the reference
|
||||
maps:fold(
|
||||
fun(Key, Value, NewMap) ->
|
||||
case Value == OriginalValue of
|
||||
true ->
|
||||
%% Wrap the reference in a string to make it JSON-serializable
|
||||
StringRef = io_lib:format("\"~s\"", [Reference]),
|
||||
WrappedRef = erlang:iolist_to_binary(StringRef),
|
||||
maps:put(Key, WrappedRef, NewMap);
|
||||
false ->
|
||||
maps:put(Key, Value, NewMap)
|
||||
end
|
||||
end,
|
||||
#{},
|
||||
OriginalMessage
|
||||
)
|
||||
end,
|
||||
RawMessage,
|
||||
TupleMap
|
||||
).
|
||||
|
||||
create_mapping_of_references_to_tuple_values(Message) ->
|
||||
maps:fold(
|
||||
fun
|
||||
(_Key, Value, TupleMap) when is_tuple(Value) ->
|
||||
Ref0 = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
Ref = <<"MONGO_DATE_REF_", Ref0/binary>>,
|
||||
maps:put(Ref, Value, TupleMap);
|
||||
(_key, _value, TupleMap) ->
|
||||
TupleMap
|
||||
end,
|
||||
#{},
|
||||
Message
|
||||
).
|
||||
|
||||
populate_map_with_tuple_values(TupleMap, MapToMap) ->
|
||||
MappingFun =
|
||||
fun
|
||||
(_Key, Value) when is_map(Value) ->
|
||||
populate_map_with_tuple_values(TupleMap, Value);
|
||||
(_Key, Value) ->
|
||||
case maps:is_key(Value, TupleMap) of
|
||||
true ->
|
||||
maps:get(Value, TupleMap);
|
||||
false ->
|
||||
Value
|
||||
end
|
||||
end,
|
||||
maps:map(MappingFun, MapToMap).
|
||||
|
|
Loading…
Reference in New Issue