Merge pull request #13199 from thalesmg/message-transformation-r57-20240604
feat: implement message transformation
This commit is contained in:
commit
83ff07ade0
|
@ -25,7 +25,8 @@
|
|||
-define(HP_AUTHN, 970).
|
||||
-define(HP_AUTHZ, 960).
|
||||
-define(HP_SYS_MSGS, 950).
|
||||
-define(HP_MSG_VALIDATION, 945).
|
||||
-define(HP_SCHEMA_VALIDATION, 945).
|
||||
-define(HP_MESSAGE_TRANSFORMATION, 943).
|
||||
-define(HP_TOPIC_METRICS, 940).
|
||||
-define(HP_RETAINER, 930).
|
||||
-define(HP_AUTO_SUB, 920).
|
||||
|
|
|
@ -684,21 +684,28 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
|||
end.
|
||||
|
||||
packet_to_message(Packet, #channel{
|
||||
conninfo = #{proto_ver := ProtoVer},
|
||||
clientinfo = #{
|
||||
conninfo = #{
|
||||
peername := PeerName,
|
||||
proto_ver := ProtoVer
|
||||
},
|
||||
clientinfo =
|
||||
#{
|
||||
protocol := Protocol,
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost,
|
||||
mountpoint := MountPoint
|
||||
}
|
||||
} = ClientInfo
|
||||
}) ->
|
||||
ClientAttrs = maps:get(client_attrs, ClientInfo, #{}),
|
||||
emqx_mountpoint:mount(
|
||||
MountPoint,
|
||||
emqx_packet:to_message(
|
||||
Packet,
|
||||
ClientId,
|
||||
#{
|
||||
client_attrs => ClientAttrs,
|
||||
peername => PeerName,
|
||||
proto_ver => ProtoVer,
|
||||
protocol => Protocol,
|
||||
username => Username,
|
||||
|
|
|
@ -60,6 +60,7 @@
|
|||
'message.publish',
|
||||
'message.puback',
|
||||
'message.dropped',
|
||||
'message.transformation_failed',
|
||||
'schema.validation_failed',
|
||||
'message.delivered',
|
||||
'message.acked',
|
||||
|
|
|
@ -211,6 +211,10 @@
|
|||
{counter, 'messages.validation_failed'},
|
||||
%% % Messages that passed validations
|
||||
{counter, 'messages.validation_succeeded'},
|
||||
%% % Messages that failed transformations
|
||||
{counter, 'messages.transformation_failed'},
|
||||
%% % Messages that passed transformations
|
||||
{counter, 'messages.transformation_succeeded'},
|
||||
% QoS2 Messages expired
|
||||
{counter, 'messages.dropped.await_pubrel_timeout'},
|
||||
% Messages dropped
|
||||
|
@ -721,4 +725,6 @@ reserved_idx('overload_protection.new_conn') -> 404;
|
|||
reserved_idx('messages.validation_succeeded') -> 405;
|
||||
reserved_idx('messages.validation_failed') -> 406;
|
||||
reserved_idx('messages.persisted') -> 407;
|
||||
reserved_idx('messages.transformation_succeeded') -> 408;
|
||||
reserved_idx('messages.transformation_failed') -> 409;
|
||||
reserved_idx(_) -> undefined.
|
||||
|
|
|
@ -385,6 +385,8 @@ default_appspec(emqx_schema_registry, _SuiteOpts) ->
|
|||
#{schema_mod => emqx_schema_registry_schema, config => #{}};
|
||||
default_appspec(emqx_schema_validation, _SuiteOpts) ->
|
||||
#{schema_mod => emqx_schema_validation_schema, config => #{}};
|
||||
default_appspec(emqx_message_transformation, _SuiteOpts) ->
|
||||
#{schema_mod => emqx_message_transformation_schema, config => #{}};
|
||||
default_appspec(_, _) ->
|
||||
#{}.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_conf, [
|
||||
{description, "EMQX configuration management"},
|
||||
{vsn, "0.2.0"},
|
||||
{vsn, "0.2.1"},
|
||||
{registered, []},
|
||||
{mod, {emqx_conf_app, []}},
|
||||
{applications, [kernel, stdlib]},
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
-define(AUDIT_MOD, audit).
|
||||
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
|
||||
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
|
||||
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
|
||||
|
||||
-dialyzer({no_match, [load/0]}).
|
||||
|
||||
|
@ -335,6 +336,14 @@ update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode :=
|
|||
check_res(Key, emqx_conf:update([Key], {merge, NewConf}, ?OPTIONS), NewConf, Opts);
|
||||
update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := replace} = Opts) ->
|
||||
check_res(Key, emqx_conf:update([Key], {replace, NewConf}, ?OPTIONS), NewConf, Opts);
|
||||
update_config_cluster(
|
||||
?MESSAGE_TRANSFORMATION_CONF_ROOT_BIN = Key, NewConf, #{mode := merge} = Opts
|
||||
) ->
|
||||
check_res(Key, emqx_conf:update([Key], {merge, NewConf}, ?OPTIONS), NewConf, Opts);
|
||||
update_config_cluster(
|
||||
?MESSAGE_TRANSFORMATION_CONF_ROOT_BIN = Key, NewConf, #{mode := replace} = Opts
|
||||
) ->
|
||||
check_res(Key, emqx_conf:update([Key], {replace, NewConf}, ?OPTIONS), NewConf, Opts);
|
||||
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
|
||||
Merged = merge_conf(Key, NewConf),
|
||||
check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);
|
||||
|
|
|
@ -67,6 +67,8 @@
|
|||
%, sent_bytes
|
||||
validation_succeeded,
|
||||
validation_failed,
|
||||
transformation_succeeded,
|
||||
transformation_failed,
|
||||
dropped,
|
||||
persisted
|
||||
]).
|
||||
|
@ -90,6 +92,8 @@
|
|||
sent => sent_msg_rate,
|
||||
validation_succeeded => validation_succeeded_rate,
|
||||
validation_failed => validation_failed_rate,
|
||||
transformation_succeeded => transformation_succeeded_rate,
|
||||
transformation_failed => transformation_failed_rate,
|
||||
dropped => dropped_msg_rate,
|
||||
persisted => persisted_rate
|
||||
}).
|
||||
|
|
|
@ -474,6 +474,10 @@ stats(validation_succeeded) ->
|
|||
emqx_metrics:val('messages.validation_succeeded');
|
||||
stats(validation_failed) ->
|
||||
emqx_metrics:val('messages.validation_failed');
|
||||
stats(transformation_succeeded) ->
|
||||
emqx_metrics:val('messages.transformation_succeeded');
|
||||
stats(transformation_failed) ->
|
||||
emqx_metrics:val('messages.transformation_failed');
|
||||
stats(dropped) ->
|
||||
emqx_metrics:val('messages.dropped');
|
||||
stats(persisted) ->
|
||||
|
|
|
@ -198,6 +198,10 @@ swagger_desc(validation_succeeded) ->
|
|||
swagger_desc_format("Schema validations succeeded ");
|
||||
swagger_desc(validation_failed) ->
|
||||
swagger_desc_format("Schema validations failed ");
|
||||
swagger_desc(transformation_succeeded) ->
|
||||
swagger_desc_format("Message transformations succeeded ");
|
||||
swagger_desc(transformation_failed) ->
|
||||
swagger_desc_format("Message transformations failed ");
|
||||
swagger_desc(persisted) ->
|
||||
swagger_desc_format("Messages saved to the durable storage ");
|
||||
swagger_desc(disconnected_durable_sessions) ->
|
||||
|
@ -230,6 +234,10 @@ swagger_desc(validation_succeeded_rate) ->
|
|||
swagger_desc_format("Schema validations succeeded ", per);
|
||||
swagger_desc(validation_failed_rate) ->
|
||||
swagger_desc_format("Schema validations failed ", per);
|
||||
swagger_desc(transformation_succeeded_rate) ->
|
||||
swagger_desc_format("Message transformations succeeded ", per);
|
||||
swagger_desc(transformation_failed_rate) ->
|
||||
swagger_desc_format("Message transformations failed ", per);
|
||||
swagger_desc(persisted_rate) ->
|
||||
swagger_desc_format("Messages saved to the durable storage ", per);
|
||||
swagger_desc(retained_msg_count) ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_enterprise, [
|
||||
{description, "EMQX Enterprise Edition"},
|
||||
{vsn, "0.2.0"},
|
||||
{vsn, "0.2.1"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
emqx_license_schema,
|
||||
emqx_schema_registry_schema,
|
||||
emqx_schema_validation_schema,
|
||||
emqx_message_transformation_schema,
|
||||
emqx_ft_schema
|
||||
]).
|
||||
|
||||
|
@ -196,6 +197,7 @@ audit_log_conf() ->
|
|||
|
||||
tr_prometheus_collectors(Conf) ->
|
||||
[
|
||||
{'/prometheus/schema_validation', emqx_prometheus_schema_validation}
|
||||
{'/prometheus/schema_validation', emqx_prometheus_schema_validation},
|
||||
{'/prometheus/message_transformation', emqx_prometheus_message_transformation}
|
||||
| emqx_conf_schema:tr_prometheus_collectors(Conf)
|
||||
].
|
||||
|
|
|
@ -89,6 +89,7 @@
|
|||
emqx_license,
|
||||
emqx_enterprise,
|
||||
emqx_schema_validation,
|
||||
emqx_message_transformation,
|
||||
emqx_connector_aggregator,
|
||||
emqx_bridge_kafka,
|
||||
emqx_bridge_pulsar,
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
{id, "emqx_machine"},
|
||||
{description, "The EMQX Machine"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "0.3.0"},
|
||||
{vsn, "0.3.1"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, emqx_ctl]},
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
Business Source License 1.1
|
||||
|
||||
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||
Licensed Work: EMQX Enterprise Edition
|
||||
The Licensed Work is (c) 2024
|
||||
Hangzhou EMQ Technologies Co., Ltd.
|
||||
Additional Use Grant: Students and educators are granted right to copy,
|
||||
modify, and create derivative work for research
|
||||
or education.
|
||||
Change Date: 2028-06-05
|
||||
Change License: Apache License, Version 2.0
|
||||
|
||||
For information about alternative licensing arrangements for the Software,
|
||||
please contact Licensor: https://www.emqx.com/en/contact
|
||||
|
||||
Notice
|
||||
|
||||
The Business Source License (this document, or the “License”) is not an Open
|
||||
Source license. However, the Licensed Work will eventually be made available
|
||||
under an Open Source License, as stated in this License.
|
||||
|
||||
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
|
||||
“Business Source License” is a trademark of MariaDB Corporation Ab.
|
||||
|
||||
-----------------------------------------------------------------------------
|
||||
|
||||
Business Source License 1.1
|
||||
|
||||
Terms
|
||||
|
||||
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||
works, redistribute, and make non-production use of the Licensed Work. The
|
||||
Licensor may make an Additional Use Grant, above, permitting limited
|
||||
production use.
|
||||
|
||||
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||
available distribution of a specific version of the Licensed Work under this
|
||||
License, whichever comes first, the Licensor hereby grants you rights under
|
||||
the terms of the Change License, and the rights granted in the paragraph
|
||||
above terminate.
|
||||
|
||||
If your use of the Licensed Work does not comply with the requirements
|
||||
currently in effect as described in this License, you must purchase a
|
||||
commercial license from the Licensor, its affiliated entities, or authorized
|
||||
resellers, or you must refrain from using the Licensed Work.
|
||||
|
||||
All copies of the original and modified Licensed Work, and derivative works
|
||||
of the Licensed Work, are subject to this License. This License applies
|
||||
separately for each version of the Licensed Work and the Change Date may vary
|
||||
for each version of the Licensed Work released by Licensor.
|
||||
|
||||
You must conspicuously display this License on each original or modified copy
|
||||
of the Licensed Work. If you receive the Licensed Work in original or
|
||||
modified form from a third party, the terms and conditions set forth in this
|
||||
License apply to your use of that work.
|
||||
|
||||
Any use of the Licensed Work in violation of this License will automatically
|
||||
terminate your rights under this License for the current and all other
|
||||
versions of the Licensed Work.
|
||||
|
||||
This License does not grant you any right in any trademark or logo of
|
||||
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||
Licensor as expressly required by this License).
|
||||
|
||||
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||
TITLE.
|
||||
|
||||
MariaDB hereby grants you permission to use this License’s text to license
|
||||
your works, and to refer to it using the trademark “Business Source License”,
|
||||
as long as you comply with the Covenants of Licensor below.
|
||||
|
||||
Covenants of Licensor
|
||||
|
||||
In consideration of the right to use this License’s text and the “Business
|
||||
Source License” name and trademark, Licensor covenants to MariaDB, and to all
|
||||
other recipients of the licensed work to be provided by Licensor:
|
||||
|
||||
1. To specify as the Change License the GPL Version 2.0 or any later version,
|
||||
or a license that is compatible with GPL Version 2.0 or a later version,
|
||||
where “compatible” means that software provided under the Change License can
|
||||
be included in a program with software provided under GPL Version 2.0 or a
|
||||
later version. Licensor may specify additional Change Licenses without
|
||||
limitation.
|
||||
|
||||
2. To either: (a) specify an additional grant of rights to use that does not
|
||||
impose any additional restriction on the right granted in this License, as
|
||||
the Additional Use Grant; or (b) insert the text “None”.
|
||||
|
||||
3. To specify a Change Date.
|
||||
|
||||
4. Not to modify this License in any other way.
|
|
@ -0,0 +1,29 @@
|
|||
# EMQX Message Transformation
|
||||
|
||||
This application encapsulates the functionality to transform incoming or internally
|
||||
triggered published payloads and take an action upon failure, which can be to just drop
|
||||
the message without further processing, or to disconnect the offending client as well.
|
||||
|
||||
# Documentation
|
||||
|
||||
Refer to [Message
|
||||
Transformation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-transformation.html)
|
||||
for more information about the semantics.
|
||||
|
||||
# HTTP APIs
|
||||
|
||||
APIs are provided for transformation management, which includes creating,
|
||||
updating, looking up, deleting, listing transformations.
|
||||
|
||||
Refer to [API Docs -
|
||||
Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Transformation)
|
||||
for more detailed information.
|
||||
|
||||
|
||||
# Contributing
|
||||
|
||||
Please see our [contributing.md](../../CONTRIBUTING.md).
|
||||
|
||||
# License
|
||||
|
||||
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).
|
|
@ -0,0 +1,15 @@
|
|||
%% -*- mode: erlang -*-
|
||||
|
||||
{erl_opts, [
|
||||
warn_unused_vars,
|
||||
warn_shadow_vars,
|
||||
warn_unused_import,
|
||||
warn_obsolete_guard,
|
||||
warnings_as_errors,
|
||||
debug_info
|
||||
]}.
|
||||
{deps, [
|
||||
{emqx, {path, "../emqx"}},
|
||||
{emqx_utils, {path, "../emqx_utils"}},
|
||||
{emqx_schema_registry, {path, "../emqx_schema_registry"}}
|
||||
]}.
|
|
@ -0,0 +1,15 @@
|
|||
{application, emqx_message_transformation, [
|
||||
{description, "EMQX Message Transformation"},
|
||||
{vsn, "0.1.0"},
|
||||
{registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]},
|
||||
{mod, {emqx_message_transformation_app, []}},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
emqx
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
||||
{links, []}
|
||||
]}.
|
|
@ -0,0 +1,504 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation).
|
||||
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% API
|
||||
-export([
|
||||
list/0,
|
||||
reorder/1,
|
||||
lookup/1,
|
||||
insert/1,
|
||||
update/1,
|
||||
delete/1
|
||||
]).
|
||||
|
||||
%% `emqx_hooks' API
|
||||
-export([
|
||||
register_hooks/0,
|
||||
unregister_hooks/0,
|
||||
|
||||
on_message_publish/1
|
||||
]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-define(TRACE_TAG, "MESSAGE_TRANSFORMATION").
|
||||
-define(CONF_ROOT, message_transformation).
|
||||
-define(CONF_ROOT_BIN, <<"message_transformation">>).
|
||||
-define(TRANSFORMATIONS_CONF_PATH, [?CONF_ROOT, transformations]).
|
||||
|
||||
-type transformation_name() :: binary().
|
||||
%% TODO: make more specific typespec
|
||||
-type transformation() :: #{atom() => term()}.
|
||||
%% TODO: make more specific typespec
|
||||
-type variform() :: any().
|
||||
-type operation() :: #{key := [binary(), ...], value := variform()}.
|
||||
-type qos() :: 0..2.
|
||||
-type rendered_value() :: qos() | boolean() | binary().
|
||||
|
||||
-type eval_context() :: #{
|
||||
client_attrs := map(),
|
||||
payload := _,
|
||||
qos := _,
|
||||
retain := _,
|
||||
topic := _,
|
||||
user_property := _,
|
||||
dirty := #{
|
||||
payload => true,
|
||||
qos => true,
|
||||
retain => true,
|
||||
topic => true,
|
||||
user_property => true
|
||||
}
|
||||
}.
|
||||
|
||||
-export_type([
|
||||
transformation/0,
|
||||
transformation_name/0
|
||||
]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec list() -> [transformation()].
|
||||
list() ->
|
||||
emqx_message_transformation_config:list().
|
||||
|
||||
-spec reorder([transformation_name()]) ->
|
||||
{ok, _} | {error, _}.
|
||||
reorder(Order) ->
|
||||
emqx_message_transformation_config:reorder(Order).
|
||||
|
||||
-spec lookup(transformation_name()) -> {ok, transformation()} | {error, not_found}.
|
||||
lookup(Name) ->
|
||||
emqx_message_transformation_config:lookup(Name).
|
||||
|
||||
-spec insert(transformation()) ->
|
||||
{ok, _} | {error, _}.
|
||||
insert(Transformation) ->
|
||||
emqx_message_transformation_config:insert(Transformation).
|
||||
|
||||
-spec update(transformation()) ->
|
||||
{ok, _} | {error, _}.
|
||||
update(Transformation) ->
|
||||
emqx_message_transformation_config:update(Transformation).
|
||||
|
||||
-spec delete(transformation_name()) ->
|
||||
{ok, _} | {error, _}.
|
||||
delete(Name) ->
|
||||
emqx_message_transformation_config:delete(Name).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Hooks
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec register_hooks() -> ok.
|
||||
register_hooks() ->
|
||||
emqx_hooks:put(
|
||||
'message.publish', {?MODULE, on_message_publish, []}, ?HP_MESSAGE_TRANSFORMATION
|
||||
).
|
||||
|
||||
-spec unregister_hooks() -> ok.
|
||||
unregister_hooks() ->
|
||||
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
|
||||
|
||||
-spec on_message_publish(emqx_types:message()) ->
|
||||
{ok, emqx_types:message()} | {stop, emqx_types:message()}.
|
||||
on_message_publish(Message = #message{topic = Topic}) ->
|
||||
case emqx_message_transformation_registry:matching_transformations(Topic) of
|
||||
[] ->
|
||||
ok;
|
||||
Transformations ->
|
||||
run_transformations(Transformations, Message)
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal exports
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec eval_operation(operation(), transformation(), eval_context()) -> {ok, eval_context()} | error.
|
||||
eval_operation(Operation, Transformation, Context) ->
|
||||
#{key := K, value := V} = Operation,
|
||||
case eval_variform(K, V, Context) of
|
||||
{error, Reason} ->
|
||||
trace_failure(Transformation, "transformation_eval_operation_failure", #{
|
||||
reason => Reason
|
||||
}),
|
||||
error;
|
||||
{ok, Rendered} ->
|
||||
NewContext = put_value(K, Rendered, Context),
|
||||
{ok, NewContext}
|
||||
end.
|
||||
|
||||
-spec eval_variform([binary(), ...], _, eval_context()) ->
|
||||
{ok, rendered_value()} | {error, term()}.
|
||||
eval_variform(K, V, Context) ->
|
||||
Opts =
|
||||
case K of
|
||||
[<<"payload">> | _] ->
|
||||
#{eval_as_string => false};
|
||||
_ ->
|
||||
#{}
|
||||
end,
|
||||
case emqx_variform:render(V, Context, Opts) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, Rendered} ->
|
||||
map_result(Rendered, K)
|
||||
end.
|
||||
|
||||
-spec put_value([binary(), ...], rendered_value(), eval_context()) -> eval_context().
|
||||
put_value([<<"payload">> | Rest], Rendered, Context0) ->
|
||||
Context = maps:update_with(dirty, fun(D) -> D#{payload => true} end, Context0),
|
||||
maps:update_with(
|
||||
payload,
|
||||
fun(P) ->
|
||||
case Rest of
|
||||
[] ->
|
||||
Rendered;
|
||||
_ ->
|
||||
emqx_utils_maps:deep_put(Rest, P, Rendered)
|
||||
end
|
||||
end,
|
||||
Context
|
||||
);
|
||||
put_value([<<"user_property">>, Key], Rendered, Context0) ->
|
||||
Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0),
|
||||
maps:update_with(
|
||||
user_property,
|
||||
fun(Ps) -> lists:keystore(Key, 1, Ps, {Key, Rendered}) end,
|
||||
Context
|
||||
);
|
||||
put_value([<<"qos">>], Rendered, Context0) ->
|
||||
Context = maps:update_with(dirty, fun(D) -> D#{qos => true} end, Context0),
|
||||
Context#{qos := Rendered};
|
||||
put_value([<<"retain">>], Rendered, Context0) ->
|
||||
Context = maps:update_with(dirty, fun(D) -> D#{retain => true} end, Context0),
|
||||
Context#{retain := Rendered};
|
||||
put_value([<<"topic">>], Rendered, Context0) ->
|
||||
Context = maps:update_with(dirty, fun(D) -> D#{topic => true} end, Context0),
|
||||
Context#{topic := Rendered}.
|
||||
|
||||
-spec map_result(binary(), [binary(), ...]) ->
|
||||
{ok, 0..2 | boolean() | binary()} | {error, map()}.
|
||||
map_result(QoSBin, [<<"qos">>]) ->
|
||||
case QoSBin of
|
||||
<<"0">> -> {ok, 0};
|
||||
<<"1">> -> {ok, 1};
|
||||
<<"2">> -> {ok, 2};
|
||||
_ -> {error, #{reason => bad_qos_value, input => QoSBin}}
|
||||
end;
|
||||
map_result(RetainBin, [<<"retain">>]) ->
|
||||
case RetainBin of
|
||||
<<"true">> -> {ok, true};
|
||||
<<"false">> -> {ok, false};
|
||||
_ -> {error, #{reason => bad_retain_value, input => RetainBin}}
|
||||
end;
|
||||
map_result(Rendered, _Key) ->
|
||||
{ok, Rendered}.
|
||||
|
||||
run_transformations(Transformations, Message = #message{headers = Headers}) ->
|
||||
case do_run_transformations(Transformations, Message) of
|
||||
#message{} = FinalMessage ->
|
||||
emqx_metrics:inc('messages.transformation_succeeded'),
|
||||
{ok, FinalMessage};
|
||||
drop ->
|
||||
emqx_metrics:inc('messages.transformation_failed'),
|
||||
{stop, Message#message{headers = Headers#{allow_publish => false}}};
|
||||
disconnect ->
|
||||
emqx_metrics:inc('messages.transformation_failed'),
|
||||
{stop, Message#message{
|
||||
headers = Headers#{
|
||||
allow_publish => false,
|
||||
should_disconnect => true
|
||||
}
|
||||
}}
|
||||
end.
|
||||
|
||||
do_run_transformations(Transformations, Message) ->
|
||||
Fun = fun(Transformation, MessageAcc) ->
|
||||
#{name := Name} = Transformation,
|
||||
emqx_message_transformation_registry:inc_matched(Name),
|
||||
case run_transformation(Transformation, MessageAcc) of
|
||||
#message{} = NewAcc ->
|
||||
emqx_message_transformation_registry:inc_succeeded(Name),
|
||||
{cont, NewAcc};
|
||||
ignore ->
|
||||
emqx_message_transformation_registry:inc_failed(Name),
|
||||
run_message_transformation_failed_hook(Message, Transformation),
|
||||
{cont, MessageAcc};
|
||||
FailureAction ->
|
||||
trace_failure(Transformation, "transformation_failed", #{
|
||||
transformation => Name,
|
||||
action => FailureAction
|
||||
}),
|
||||
emqx_message_transformation_registry:inc_failed(Name),
|
||||
run_message_transformation_failed_hook(Message, Transformation),
|
||||
{halt, FailureAction}
|
||||
end
|
||||
end,
|
||||
case emqx_utils:foldl_while(Fun, Message, Transformations) of
|
||||
#message{} = FinalMessage ->
|
||||
case is_payload_properly_encoded(FinalMessage) of
|
||||
true ->
|
||||
FinalMessage;
|
||||
false ->
|
||||
%% Take the last validation's failure action, as it's the one
|
||||
%% responsible for getting the right encoding.
|
||||
LastTransformation = lists:last(Transformations),
|
||||
#{failure_action := FailureAction} = LastTransformation,
|
||||
trace_failure(LastTransformation, "transformation_bad_encoding", #{
|
||||
action => FailureAction,
|
||||
explain => <<"final payload must be encoded as a binary">>
|
||||
}),
|
||||
FailureAction
|
||||
end;
|
||||
FailureAction ->
|
||||
FailureAction
|
||||
end.
|
||||
|
||||
run_transformation(Transformation, MessageIn) ->
|
||||
#{
|
||||
operations := Operations,
|
||||
failure_action := FailureAction,
|
||||
payload_decoder := PayloadDecoder
|
||||
} = Transformation,
|
||||
Fun = fun(Operation, Acc) ->
|
||||
case eval_operation(Operation, Transformation, Acc) of
|
||||
{ok, NewAcc} -> {cont, NewAcc};
|
||||
error -> {halt, FailureAction}
|
||||
end
|
||||
end,
|
||||
PayloadIn = MessageIn#message.payload,
|
||||
case decode(PayloadIn, PayloadDecoder, Transformation) of
|
||||
{ok, InitPayload} ->
|
||||
InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
|
||||
case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
|
||||
#{} = ContextOut ->
|
||||
context_to_message(MessageIn, ContextOut, Transformation);
|
||||
_ ->
|
||||
FailureAction
|
||||
end;
|
||||
error ->
|
||||
%% Error already logged
|
||||
FailureAction
|
||||
end.
|
||||
|
||||
-spec message_to_context(emqx_types:message(), _Payload, transformation()) -> eval_context().
|
||||
message_to_context(#message{} = Message, Payload, Transformation) ->
|
||||
#{
|
||||
payload_decoder := PayloadDecoder,
|
||||
payload_encoder := PayloadEncoder
|
||||
} = Transformation,
|
||||
Dirty =
|
||||
case PayloadEncoder =:= PayloadDecoder of
|
||||
true -> #{};
|
||||
false -> #{payload => true}
|
||||
end,
|
||||
#{
|
||||
dirty => Dirty,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
|
||||
payload => Payload,
|
||||
qos => Message#message.qos,
|
||||
retain => emqx_message:get_flag(retain, Message, false),
|
||||
topic => Message#message.topic,
|
||||
user_property => maps:get(
|
||||
'User-Property', emqx_message:get_header(properties, Message, #{}), []
|
||||
)
|
||||
}.
|
||||
|
||||
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
|
||||
{ok, emqx_types:message()} | _TODO.
|
||||
context_to_message(Message, Context, Transformation) ->
|
||||
#{
|
||||
failure_action := FailureAction,
|
||||
payload_encoder := PayloadEncoder
|
||||
} = Transformation,
|
||||
#{payload := PayloadOut} = Context,
|
||||
case encode(PayloadOut, PayloadEncoder, Transformation) of
|
||||
{ok, Payload} ->
|
||||
take_from_context(Context#{payload := Payload}, Message);
|
||||
error ->
|
||||
FailureAction
|
||||
end.
|
||||
|
||||
take_from_context(Context, Message) ->
|
||||
maps:fold(
|
||||
fun
|
||||
(payload, _, Acc) ->
|
||||
Acc#message{payload = maps:get(payload, Context)};
|
||||
(qos, _, Acc) ->
|
||||
Acc#message{qos = maps:get(qos, Context)};
|
||||
(topic, _, Acc) ->
|
||||
Acc#message{topic = maps:get(topic, Context)};
|
||||
(retain, _, Acc) ->
|
||||
emqx_message:set_flag(retain, maps:get(retain, Context), Acc);
|
||||
(user_property, _, Acc) ->
|
||||
Props0 = emqx_message:get_header(properties, Acc, #{}),
|
||||
Props = maps:merge(Props0, #{'User-Property' => maps:get(user_property, Context)}),
|
||||
emqx_message:set_header(properties, Props, Acc)
|
||||
end,
|
||||
Message,
|
||||
maps:get(dirty, Context)
|
||||
).
|
||||
|
||||
decode(Payload, #{type := none}, _Transformation) ->
|
||||
{ok, Payload};
|
||||
decode(Payload, #{type := json}, Transformation) ->
|
||||
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
|
||||
{ok, JSON} ->
|
||||
{ok, JSON};
|
||||
{error, Reason} ->
|
||||
trace_failure(Transformation, "payload_decode_failed", #{
|
||||
decoder => json,
|
||||
reason => Reason
|
||||
}),
|
||||
error
|
||||
end;
|
||||
decode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
|
||||
try
|
||||
{ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
|
||||
catch
|
||||
error:{serde_not_found, _} ->
|
||||
trace_failure(Transformation, "payload_decode_schema_not_found", #{
|
||||
decoder => avro,
|
||||
schema_name => SerdeName
|
||||
}),
|
||||
error;
|
||||
Class:Error:Stacktrace ->
|
||||
trace_failure(Transformation, "payload_decode_schema_failure", #{
|
||||
decoder => avro,
|
||||
schema_name => SerdeName,
|
||||
kind => Class,
|
||||
reason => Error,
|
||||
stacktrace => Stacktrace
|
||||
}),
|
||||
error
|
||||
end;
|
||||
decode(
|
||||
Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
|
||||
) ->
|
||||
try
|
||||
{ok, emqx_schema_registry_serde:decode(SerdeName, Payload, [MessageType])}
|
||||
catch
|
||||
error:{serde_not_found, _} ->
|
||||
trace_failure(Transformation, "payload_decode_schema_not_found", #{
|
||||
decoder => protobuf,
|
||||
schema_name => SerdeName,
|
||||
message_type => MessageType
|
||||
}),
|
||||
error;
|
||||
Class:Error:Stacktrace ->
|
||||
trace_failure(Transformation, "payload_decode_schema_failure", #{
|
||||
decoder => protobuf,
|
||||
schema_name => SerdeName,
|
||||
message_type => MessageType,
|
||||
kind => Class,
|
||||
reason => Error,
|
||||
stacktrace => Stacktrace
|
||||
}),
|
||||
error
|
||||
end.
|
||||
|
||||
encode(Payload, #{type := none}, _Transformation) ->
|
||||
{ok, Payload};
|
||||
encode(Payload, #{type := json}, Transformation) ->
|
||||
case emqx_utils_json:safe_encode(Payload) of
|
||||
{ok, Bin} ->
|
||||
{ok, Bin};
|
||||
{error, Reason} ->
|
||||
trace_failure(Transformation, "payload_encode_failed", #{
|
||||
encoder => json,
|
||||
reason => Reason
|
||||
}),
|
||||
error
|
||||
end;
|
||||
encode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
|
||||
try
|
||||
{ok, emqx_schema_registry_serde:encode(SerdeName, Payload)}
|
||||
catch
|
||||
error:{serde_not_found, _} ->
|
||||
trace_failure(Transformation, "payload_encode_schema_not_found", #{
|
||||
encoder => avro,
|
||||
schema_name => SerdeName
|
||||
}),
|
||||
error;
|
||||
Class:Error:Stacktrace ->
|
||||
trace_failure(Transformation, "payload_encode_schema_failure", #{
|
||||
encoder => avro,
|
||||
schema_name => SerdeName,
|
||||
kind => Class,
|
||||
reason => Error,
|
||||
stacktrace => Stacktrace
|
||||
}),
|
||||
error
|
||||
end;
|
||||
encode(
|
||||
Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
|
||||
) ->
|
||||
try
|
||||
{ok, emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageType])}
|
||||
catch
|
||||
error:{serde_not_found, _} ->
|
||||
trace_failure(Transformation, "payload_encode_schema_not_found", #{
|
||||
encoder => protobuf,
|
||||
schema_name => SerdeName,
|
||||
message_type => MessageType
|
||||
}),
|
||||
error;
|
||||
Class:Error:Stacktrace ->
|
||||
trace_failure(Transformation, "payload_encode_schema_failure", #{
|
||||
encoder => protobuf,
|
||||
schema_name => SerdeName,
|
||||
message_type => MessageType,
|
||||
kind => Class,
|
||||
reason => Error,
|
||||
stacktrace => Stacktrace
|
||||
}),
|
||||
error
|
||||
end.
|
||||
|
||||
trace_failure(#{log_failure := #{level := none}} = Transformation, _Msg, _Meta) ->
|
||||
#{
|
||||
name := _Name,
|
||||
failure_action := _Action
|
||||
} = Transformation,
|
||||
?tp(message_transformation_failed, _Meta#{log_level => none, name => _Name, message => _Msg}),
|
||||
ok;
|
||||
trace_failure(#{log_failure := #{level := Level}} = Transformation, Msg, Meta0) ->
|
||||
#{
|
||||
name := Name,
|
||||
failure_action := _Action
|
||||
} = Transformation,
|
||||
Meta = maps:merge(#{name => Name}, Meta0),
|
||||
?tp(message_transformation_failed, Meta#{
|
||||
log_level => Level, name => Name, action => _Action, message => Msg
|
||||
}),
|
||||
?TRACE(Level, ?TRACE_TAG, Msg, Meta).
|
||||
|
||||
run_message_transformation_failed_hook(Message, Transformation) ->
|
||||
#{name := Name} = Transformation,
|
||||
TransformationContext = #{name => Name},
|
||||
emqx_hooks:run('message.transformation_failed', [Message, TransformationContext]).
|
||||
|
||||
is_payload_properly_encoded(#message{payload = Payload}) ->
|
||||
try iolist_size(Payload) of
|
||||
_ ->
|
||||
true
|
||||
catch
|
||||
error:badarg ->
|
||||
false
|
||||
end.
|
|
@ -0,0 +1,34 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_app).
|
||||
|
||||
-behaviour(application).
|
||||
|
||||
%% `application' API
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `application' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec start(application:start_type(), term()) -> {ok, pid()}.
|
||||
start(_Type, _Args) ->
|
||||
{ok, Sup} = emqx_message_transformation_sup:start_link(),
|
||||
ok = emqx_variform:inject_allowed_module(emqx_message_transformation_bif),
|
||||
ok = emqx_message_transformation_config:add_handler(),
|
||||
ok = emqx_message_transformation:register_hooks(),
|
||||
ok = emqx_message_transformation_config:load(),
|
||||
{ok, Sup}.
|
||||
|
||||
-spec stop(term()) -> ok.
|
||||
stop(_State) ->
|
||||
ok = emqx_message_transformation_config:unload(),
|
||||
ok = emqx_message_transformation:unregister_hooks(),
|
||||
ok = emqx_message_transformation_config:remove_handler(),
|
||||
ok = emqx_variform:erase_allowed_module(emqx_message_transformation_bif),
|
||||
ok.
|
|
@ -0,0 +1,38 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_bif).
|
||||
|
||||
%% API
|
||||
-export([
|
||||
json_decode/1,
|
||||
json_encode/1
|
||||
]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
json_encode(X) ->
|
||||
case emqx_utils_json:safe_encode(X) of
|
||||
{ok, JSON} ->
|
||||
JSON;
|
||||
{error, Reason} ->
|
||||
throw(#{reason => json_encode_failure, detail => Reason})
|
||||
end.
|
||||
|
||||
json_decode(JSON) ->
|
||||
case emqx_utils_json:safe_decode(JSON, [return_maps]) of
|
||||
{ok, X} ->
|
||||
X;
|
||||
{error, Reason} ->
|
||||
throw(#{reason => json_decode_failure, detail => Reason})
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
|
@ -0,0 +1,395 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_config).
|
||||
|
||||
%% API
|
||||
-export([
|
||||
add_handler/0,
|
||||
remove_handler/0,
|
||||
|
||||
load/0,
|
||||
unload/0,
|
||||
|
||||
list/0,
|
||||
reorder/1,
|
||||
lookup/1,
|
||||
insert/1,
|
||||
update/1,
|
||||
delete/1
|
||||
]).
|
||||
|
||||
%% `emqx_config_handler' API
|
||||
-export([pre_config_update/3, post_config_update/5]).
|
||||
|
||||
%% `emqx_config_backup' API
|
||||
-behaviour(emqx_config_backup).
|
||||
-export([import_config/1]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-define(CONF_ROOT, message_transformation).
|
||||
-define(CONF_ROOT_BIN, <<"message_transformation">>).
|
||||
-define(TRANSFORMATIONS_CONF_PATH, [?CONF_ROOT, transformations]).
|
||||
|
||||
-type transformation_name() :: emqx_message_transformation:transformation_name().
|
||||
-type transformation() :: emqx_message_transformation:transformation().
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec add_handler() -> ok.
|
||||
add_handler() ->
|
||||
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
|
||||
ok = emqx_config_handler:add_handler(?TRANSFORMATIONS_CONF_PATH, ?MODULE),
|
||||
ok.
|
||||
|
||||
-spec remove_handler() -> ok.
|
||||
remove_handler() ->
|
||||
ok = emqx_config_handler:remove_handler(?TRANSFORMATIONS_CONF_PATH),
|
||||
ok = emqx_config_handler:remove_handler([?CONF_ROOT]),
|
||||
ok.
|
||||
|
||||
load() ->
|
||||
Transformations = emqx:get_config(?TRANSFORMATIONS_CONF_PATH, []),
|
||||
lists:foreach(
|
||||
fun({Pos, Transformation}) ->
|
||||
ok = emqx_message_transformation_registry:insert(Pos, Transformation)
|
||||
end,
|
||||
lists:enumerate(Transformations)
|
||||
).
|
||||
|
||||
unload() ->
|
||||
Transformations = emqx:get_config(?TRANSFORMATIONS_CONF_PATH, []),
|
||||
lists:foreach(
|
||||
fun({Pos, Transformation}) ->
|
||||
ok = emqx_message_transformation_registry:delete(Transformation, Pos)
|
||||
end,
|
||||
lists:enumerate(Transformations)
|
||||
).
|
||||
|
||||
-spec list() -> [transformation()].
|
||||
list() ->
|
||||
emqx:get_config(?TRANSFORMATIONS_CONF_PATH, []).
|
||||
|
||||
-spec reorder([transformation_name()]) ->
|
||||
{ok, _} | {error, _}.
|
||||
reorder(Order) ->
|
||||
emqx_conf:update(
|
||||
?TRANSFORMATIONS_CONF_PATH,
|
||||
{reorder, Order},
|
||||
#{override_to => cluster}
|
||||
).
|
||||
|
||||
-spec lookup(transformation_name()) -> {ok, transformation()} | {error, not_found}.
|
||||
lookup(Name) ->
|
||||
Transformations = emqx:get_config(?TRANSFORMATIONS_CONF_PATH, []),
|
||||
do_lookup(Name, Transformations).
|
||||
|
||||
-spec insert(transformation()) ->
|
||||
{ok, _} | {error, _}.
|
||||
insert(Transformation) ->
|
||||
emqx_conf:update(
|
||||
?TRANSFORMATIONS_CONF_PATH,
|
||||
{append, Transformation},
|
||||
#{override_to => cluster}
|
||||
).
|
||||
|
||||
-spec update(transformation()) ->
|
||||
{ok, _} | {error, _}.
|
||||
update(Transformation) ->
|
||||
emqx_conf:update(
|
||||
?TRANSFORMATIONS_CONF_PATH,
|
||||
{update, Transformation},
|
||||
#{override_to => cluster}
|
||||
).
|
||||
|
||||
-spec delete(transformation_name()) ->
|
||||
{ok, _} | {error, _}.
|
||||
delete(Name) ->
|
||||
emqx_conf:update(
|
||||
?TRANSFORMATIONS_CONF_PATH,
|
||||
{delete, Name},
|
||||
#{override_to => cluster}
|
||||
).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `emqx_config_handler' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
pre_config_update(?TRANSFORMATIONS_CONF_PATH, {append, Transformation}, OldTransformations) ->
|
||||
Transformations = OldTransformations ++ [Transformation],
|
||||
{ok, Transformations};
|
||||
pre_config_update(?TRANSFORMATIONS_CONF_PATH, {update, Transformation}, OldTransformations) ->
|
||||
replace(OldTransformations, Transformation);
|
||||
pre_config_update(?TRANSFORMATIONS_CONF_PATH, {delete, Transformation}, OldTransformations) ->
|
||||
delete(OldTransformations, Transformation);
|
||||
pre_config_update(?TRANSFORMATIONS_CONF_PATH, {reorder, Order}, OldTransformations) ->
|
||||
reorder(OldTransformations, Order);
|
||||
pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
|
||||
#{resulting_config := Config} = prepare_config_merge(NewConfig, OldConfig),
|
||||
{ok, Config};
|
||||
pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
|
||||
{ok, NewConfig}.
|
||||
|
||||
post_config_update(
|
||||
?TRANSFORMATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs
|
||||
) ->
|
||||
{Pos, Transformation} = fetch_with_index(New, Name),
|
||||
ok = emqx_message_transformation_registry:insert(Pos, Transformation),
|
||||
ok;
|
||||
post_config_update(?TRANSFORMATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
|
||||
{_Pos, OldTransformation} = fetch_with_index(Old, Name),
|
||||
{Pos, NewTransformation} = fetch_with_index(New, Name),
|
||||
ok = emqx_message_transformation_registry:update(OldTransformation, Pos, NewTransformation),
|
||||
ok;
|
||||
post_config_update(?TRANSFORMATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
|
||||
{Pos, Transformation} = fetch_with_index(Old, Name),
|
||||
ok = emqx_message_transformation_registry:delete(Transformation, Pos),
|
||||
ok;
|
||||
post_config_update(?TRANSFORMATIONS_CONF_PATH, {reorder, _Order}, New, Old, _AppEnvs) ->
|
||||
ok = emqx_message_transformation_registry:reindex_positions(New, Old),
|
||||
ok;
|
||||
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
|
||||
#{transformations := ResultingTransformations} = ResultingConfig,
|
||||
#{transformations := OldTransformations} = Old,
|
||||
#{added := NewTransformations0} =
|
||||
emqx_utils:diff_lists(
|
||||
ResultingTransformations,
|
||||
OldTransformations,
|
||||
fun(#{name := N}) -> N end
|
||||
),
|
||||
NewTransformations =
|
||||
lists:map(
|
||||
fun(#{name := Name}) ->
|
||||
{Pos, Transformation} = fetch_with_index(ResultingTransformations, Name),
|
||||
ok = emqx_message_transformation_registry:insert(Pos, Transformation),
|
||||
#{name => Name, pos => Pos}
|
||||
end,
|
||||
NewTransformations0
|
||||
),
|
||||
{ok, #{new_transformations => NewTransformations}};
|
||||
post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
|
||||
#{
|
||||
new_transformations := NewTransformations,
|
||||
changed_transformations := ChangedTransformations0,
|
||||
deleted_transformations := DeletedTransformations
|
||||
} = prepare_config_replace(Input, Old),
|
||||
#{transformations := ResultingTransformations} = ResultingConfig,
|
||||
#{transformations := OldTransformations} = Old,
|
||||
lists:foreach(
|
||||
fun(Name) ->
|
||||
{Pos, Transformation} = fetch_with_index(OldTransformations, Name),
|
||||
ok = emqx_message_transformation_registry:delete(Transformation, Pos)
|
||||
end,
|
||||
DeletedTransformations
|
||||
),
|
||||
lists:foreach(
|
||||
fun(Name) ->
|
||||
{Pos, Transformation} = fetch_with_index(ResultingTransformations, Name),
|
||||
ok = emqx_message_transformation_registry:insert(Pos, Transformation)
|
||||
end,
|
||||
NewTransformations
|
||||
),
|
||||
ChangedTransformations =
|
||||
lists:map(
|
||||
fun(Name) ->
|
||||
{_Pos, OldTransformation} = fetch_with_index(OldTransformations, Name),
|
||||
{Pos, NewTransformation} = fetch_with_index(ResultingTransformations, Name),
|
||||
ok = emqx_message_transformation_registry:update(
|
||||
OldTransformation, Pos, NewTransformation
|
||||
),
|
||||
#{name => Name, pos => Pos}
|
||||
end,
|
||||
ChangedTransformations0
|
||||
),
|
||||
ok = emqx_message_transformation_registry:reindex_positions(
|
||||
ResultingTransformations, OldTransformations
|
||||
),
|
||||
{ok, #{changed_transformations => ChangedTransformations}}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `emqx_config_backup' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
|
||||
Result = emqx_conf:update(
|
||||
[?CONF_ROOT],
|
||||
{merge, RawConf0},
|
||||
#{override_to => cluster, rawconf_with_defaults => true}
|
||||
),
|
||||
case Result of
|
||||
{error, Reason} ->
|
||||
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
|
||||
{ok, _} ->
|
||||
Keys0 = maps:keys(RawConf0),
|
||||
ChangedPaths = Keys0 -- [<<"transformations">>],
|
||||
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
|
||||
end;
|
||||
import_config(_RawConf) ->
|
||||
{ok, #{root_key => ?CONF_ROOT, changed => []}}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
replace(OldTransformations, Transformation = #{<<"name">> := Name}) ->
|
||||
{Found, RevNewTransformations} =
|
||||
lists:foldl(
|
||||
fun
|
||||
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
|
||||
{true, [Transformation | Acc]};
|
||||
(Val, {FoundIn, Acc}) ->
|
||||
{FoundIn, [Val | Acc]}
|
||||
end,
|
||||
{false, []},
|
||||
OldTransformations
|
||||
),
|
||||
case Found of
|
||||
true ->
|
||||
{ok, lists:reverse(RevNewTransformations)};
|
||||
false ->
|
||||
{error, not_found}
|
||||
end.
|
||||
|
||||
delete(OldTransformations, Name) ->
|
||||
{Found, RevNewTransformations} =
|
||||
lists:foldl(
|
||||
fun
|
||||
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
|
||||
{true, Acc};
|
||||
(Val, {FoundIn, Acc}) ->
|
||||
{FoundIn, [Val | Acc]}
|
||||
end,
|
||||
{false, []},
|
||||
OldTransformations
|
||||
),
|
||||
case Found of
|
||||
true ->
|
||||
{ok, lists:reverse(RevNewTransformations)};
|
||||
false ->
|
||||
{error, not_found}
|
||||
end.
|
||||
|
||||
reorder(Transformations, Order) ->
|
||||
Context = #{
|
||||
not_found => sets:new([{version, 2}]),
|
||||
duplicated => sets:new([{version, 2}]),
|
||||
res => [],
|
||||
seen => sets:new([{version, 2}])
|
||||
},
|
||||
reorder(Transformations, Order, Context).
|
||||
|
||||
reorder(NotReordered, _Order = [], #{not_found := NotFound0, duplicated := Duplicated0, res := Res}) ->
|
||||
NotFound = sets:to_list(NotFound0),
|
||||
Duplicated = sets:to_list(Duplicated0),
|
||||
case {NotReordered, NotFound, Duplicated} of
|
||||
{[], [], []} ->
|
||||
{ok, lists:reverse(Res)};
|
||||
{_, _, _} ->
|
||||
Error = #{
|
||||
not_found => NotFound,
|
||||
duplicated => Duplicated,
|
||||
not_reordered => [N || #{<<"name">> := N} <- NotReordered]
|
||||
},
|
||||
{error, Error}
|
||||
end;
|
||||
reorder(RemainingTransformations, [Name | Rest], Context0 = #{seen := Seen0}) ->
|
||||
case sets:is_element(Name, Seen0) of
|
||||
true ->
|
||||
Context = maps:update_with(
|
||||
duplicated, fun(S) -> sets:add_element(Name, S) end, Context0
|
||||
),
|
||||
reorder(RemainingTransformations, Rest, Context);
|
||||
false ->
|
||||
case safe_take(Name, RemainingTransformations) of
|
||||
error ->
|
||||
Context = maps:update_with(
|
||||
not_found, fun(S) -> sets:add_element(Name, S) end, Context0
|
||||
),
|
||||
reorder(RemainingTransformations, Rest, Context);
|
||||
{ok, {Transformation, Front, Rear}} ->
|
||||
Context1 = maps:update_with(
|
||||
seen, fun(S) -> sets:add_element(Name, S) end, Context0
|
||||
),
|
||||
Context = maps:update_with(res, fun(Vs) -> [Transformation | Vs] end, Context1),
|
||||
reorder(Front ++ Rear, Rest, Context)
|
||||
end
|
||||
end.
|
||||
|
||||
fetch_with_index([{Pos, #{name := Name} = Transformation} | _Rest], Name) ->
|
||||
{Pos, Transformation};
|
||||
fetch_with_index([{_, _} | Rest], Name) ->
|
||||
fetch_with_index(Rest, Name);
|
||||
fetch_with_index(Transformations, Name) ->
|
||||
fetch_with_index(lists:enumerate(Transformations), Name).
|
||||
|
||||
safe_take(Name, Transformations) ->
|
||||
case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Transformations) of
|
||||
{_Front, []} ->
|
||||
error;
|
||||
{Front, [Found | Rear]} ->
|
||||
{ok, {Found, Front, Rear}}
|
||||
end.
|
||||
|
||||
do_lookup(_Name, _Transformations = []) ->
|
||||
{error, not_found};
|
||||
do_lookup(Name, [#{name := Name} = Transformation | _Rest]) ->
|
||||
{ok, Transformation};
|
||||
do_lookup(Name, [_ | Rest]) ->
|
||||
do_lookup(Name, Rest).
|
||||
|
||||
%% "Merging" in the context of the transformation array means:
|
||||
%% * Existing transformations (identified by `name') are left untouched.
|
||||
%% * No transformations are removed.
|
||||
%% * New transformations are appended to the existing list.
|
||||
%% * Existing transformations are not reordered.
|
||||
prepare_config_merge(NewConfig0, OldConfig) ->
|
||||
{ImportedRawTransformations, NewConfigNoTransformations} =
|
||||
case maps:take(<<"transformations">>, NewConfig0) of
|
||||
error ->
|
||||
{[], NewConfig0};
|
||||
{V, R} ->
|
||||
{V, R}
|
||||
end,
|
||||
OldRawTransformations = maps:get(<<"transformations">>, OldConfig, []),
|
||||
#{added := NewRawTransformations} = emqx_utils:diff_lists(
|
||||
ImportedRawTransformations,
|
||||
OldRawTransformations,
|
||||
fun(#{<<"name">> := N}) -> N end
|
||||
),
|
||||
Config0 = emqx_utils_maps:deep_merge(OldConfig, NewConfigNoTransformations),
|
||||
Config = maps:update_with(
|
||||
<<"transformations">>,
|
||||
fun(OldVs) -> OldVs ++ NewRawTransformations end,
|
||||
NewRawTransformations,
|
||||
Config0
|
||||
),
|
||||
#{
|
||||
new_transformations => NewRawTransformations,
|
||||
resulting_config => Config
|
||||
}.
|
||||
|
||||
prepare_config_replace(NewConfig, OldConfig) ->
|
||||
ImportedRawTransformations = maps:get(<<"transformations">>, NewConfig, []),
|
||||
OldTransformations = maps:get(transformations, OldConfig, []),
|
||||
%% Since, at this point, we have an input raw config but a parsed old config, we
|
||||
%% project both to the to have only their names, and consider common names as changed.
|
||||
#{
|
||||
added := NewTransformations,
|
||||
removed := DeletedTransformations,
|
||||
changed := ChangedTransformations0,
|
||||
identical := ChangedTransformations1
|
||||
} = emqx_utils:diff_lists(
|
||||
lists:map(fun(#{<<"name">> := N}) -> N end, ImportedRawTransformations),
|
||||
lists:map(fun(#{name := N}) -> N end, OldTransformations),
|
||||
fun(N) -> N end
|
||||
),
|
||||
#{
|
||||
new_transformations => NewTransformations,
|
||||
changed_transformations => ChangedTransformations0 ++ ChangedTransformations1,
|
||||
deleted_transformations => DeletedTransformations
|
||||
}.
|
|
@ -0,0 +1,656 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_http_api).
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
|
||||
|
||||
%% `minirest' and `minirest_trails' API
|
||||
-export([
|
||||
namespace/0,
|
||||
api_spec/0,
|
||||
fields/1,
|
||||
paths/0,
|
||||
schema/1
|
||||
]).
|
||||
|
||||
%% `minirest' handlers
|
||||
-export([
|
||||
'/message_transformations'/2,
|
||||
'/message_transformations/reorder'/2,
|
||||
'/message_transformations/transformation/:name'/2,
|
||||
'/message_transformations/transformation/:name/metrics'/2,
|
||||
'/message_transformations/transformation/:name/metrics/reset'/2,
|
||||
'/message_transformations/transformation/:name/enable/:enable'/2
|
||||
]).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% Type definitions
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
-define(TAGS, [<<"Message Transformation">>]).
|
||||
-define(METRIC_NAME, message_transformation).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `minirest' and `minirest_trails' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "message_transformation_http_api".
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
||||
paths() ->
|
||||
[
|
||||
"/message_transformations",
|
||||
"/message_transformations/reorder",
|
||||
"/message_transformations/transformation/:name",
|
||||
"/message_transformations/transformation/:name/metrics",
|
||||
"/message_transformations/transformation/:name/metrics/reset",
|
||||
"/message_transformations/transformation/:name/enable/:enable"
|
||||
].
|
||||
|
||||
schema("/message_transformations") ->
|
||||
#{
|
||||
'operationId' => '/message_transformations',
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"List transformations">>,
|
||||
description => ?DESC("list_transformations"),
|
||||
responses =>
|
||||
#{
|
||||
200 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
array(
|
||||
emqx_message_transformation_schema:api_schema(list)
|
||||
),
|
||||
example_return_list()
|
||||
)
|
||||
}
|
||||
},
|
||||
post => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Append a new transformation">>,
|
||||
description => ?DESC("append_transformation"),
|
||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_transformation_schema:api_schema(post),
|
||||
example_input_create()
|
||||
),
|
||||
responses =>
|
||||
#{
|
||||
201 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_transformation_schema:api_schema(post),
|
||||
example_return_create()
|
||||
),
|
||||
400 => error_schema('ALREADY_EXISTS', "Transformation already exists")
|
||||
}
|
||||
},
|
||||
put => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Update a transformation">>,
|
||||
description => ?DESC("update_transformation"),
|
||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_transformation_schema:api_schema(put),
|
||||
example_input_update()
|
||||
),
|
||||
responses =>
|
||||
#{
|
||||
200 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
emqx_message_transformation_schema:api_schema(put),
|
||||
example_return_update()
|
||||
),
|
||||
404 => error_schema('NOT_FOUND', "Transformation not found"),
|
||||
400 => error_schema('BAD_REQUEST', "Bad params")
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/message_transformations/reorder") ->
|
||||
#{
|
||||
'operationId' => '/message_transformations/reorder',
|
||||
post => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Reorder all transformations">>,
|
||||
description => ?DESC("reorder_transformations"),
|
||||
'requestBody' =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
ref(reorder),
|
||||
example_input_reorder()
|
||||
),
|
||||
responses =>
|
||||
#{
|
||||
204 => <<"No Content">>,
|
||||
400 => error_schema(
|
||||
'BAD_REQUEST',
|
||||
<<"Bad request">>,
|
||||
[
|
||||
{not_found,
|
||||
mk(array(binary()), #{desc => "Transformations not found"})},
|
||||
{not_reordered,
|
||||
mk(array(binary()), #{
|
||||
desc => "Transformations not referenced in input"
|
||||
})},
|
||||
{duplicated,
|
||||
mk(array(binary()), #{desc => "Duplicated transformations in input"})}
|
||||
]
|
||||
)
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/message_transformations/transformation/:name") ->
|
||||
#{
|
||||
'operationId' => '/message_transformations/transformation/:name',
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Lookup a transformation">>,
|
||||
description => ?DESC("lookup_transformation"),
|
||||
parameters => [param_path_name()],
|
||||
responses =>
|
||||
#{
|
||||
200 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
array(
|
||||
emqx_message_transformation_schema:api_schema(lookup)
|
||||
),
|
||||
example_return_lookup()
|
||||
),
|
||||
404 => error_schema('NOT_FOUND', "Transformation not found")
|
||||
}
|
||||
},
|
||||
delete => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Delete a transformation">>,
|
||||
description => ?DESC("delete_transformation"),
|
||||
parameters => [param_path_name()],
|
||||
responses =>
|
||||
#{
|
||||
204 => <<"Transformation deleted">>,
|
||||
404 => error_schema('NOT_FOUND', "Transformation not found")
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/message_transformations/transformation/:name/metrics") ->
|
||||
#{
|
||||
'operationId' => '/message_transformations/transformation/:name/metrics',
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Get transformation metrics">>,
|
||||
description => ?DESC("get_transformation_metrics"),
|
||||
parameters => [param_path_name()],
|
||||
responses =>
|
||||
#{
|
||||
200 =>
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
ref(get_metrics),
|
||||
example_return_metrics()
|
||||
),
|
||||
404 => error_schema('NOT_FOUND', "Transformation not found")
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/message_transformations/transformation/:name/metrics/reset") ->
|
||||
#{
|
||||
'operationId' => '/message_transformations/transformation/:name/metrics/reset',
|
||||
post => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Reset transformation metrics">>,
|
||||
description => ?DESC("reset_transformation_metrics"),
|
||||
parameters => [param_path_name()],
|
||||
responses =>
|
||||
#{
|
||||
204 => <<"No content">>,
|
||||
404 => error_schema('NOT_FOUND', "Transformation not found")
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/message_transformations/transformation/:name/enable/:enable") ->
|
||||
#{
|
||||
'operationId' => '/message_transformations/transformation/:name/enable/:enable',
|
||||
post => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"Enable or disable transformation">>,
|
||||
description => ?DESC("enable_disable_transformation"),
|
||||
parameters => [param_path_name(), param_path_enable()],
|
||||
responses =>
|
||||
#{
|
||||
204 => <<"No content">>,
|
||||
404 => error_schema('NOT_FOUND', "Transformation not found"),
|
||||
400 => error_schema('BAD_REQUEST', "Bad params")
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
param_path_name() ->
|
||||
{name,
|
||||
mk(
|
||||
binary(),
|
||||
#{
|
||||
in => path,
|
||||
required => true,
|
||||
example => <<"my_transformation">>,
|
||||
desc => ?DESC("param_path_name")
|
||||
}
|
||||
)}.
|
||||
|
||||
param_path_enable() ->
|
||||
{enable,
|
||||
mk(
|
||||
boolean(),
|
||||
#{
|
||||
in => path,
|
||||
required => true,
|
||||
desc => ?DESC("param_path_enable")
|
||||
}
|
||||
)}.
|
||||
|
||||
fields(front) ->
|
||||
[{position, mk(front, #{default => front, required => true, in => body})}];
|
||||
fields(rear) ->
|
||||
[{position, mk(rear, #{default => rear, required => true, in => body})}];
|
||||
fields('after') ->
|
||||
[
|
||||
{position, mk('after', #{default => 'after', required => true, in => body})},
|
||||
{transformation, mk(binary(), #{required => true, in => body})}
|
||||
];
|
||||
fields(before) ->
|
||||
[
|
||||
{position, mk(before, #{default => before, required => true, in => body})},
|
||||
{transformation, mk(binary(), #{required => true, in => body})}
|
||||
];
|
||||
fields(reorder) ->
|
||||
[
|
||||
{order, mk(array(binary()), #{required => true, in => body})}
|
||||
];
|
||||
fields(get_metrics) ->
|
||||
[
|
||||
{metrics, mk(ref(metrics), #{})},
|
||||
{node_metrics, mk(ref(node_metrics), #{})}
|
||||
];
|
||||
fields(metrics) ->
|
||||
[
|
||||
{matched, mk(non_neg_integer(), #{})},
|
||||
{succeeded, mk(non_neg_integer(), #{})},
|
||||
{failed, mk(non_neg_integer(), #{})}
|
||||
];
|
||||
fields(node_metrics) ->
|
||||
[
|
||||
{node, mk(binary(), #{})}
|
||||
| fields(metrics)
|
||||
].
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `minirest' handlers
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
'/message_transformations'(get, _Params) ->
|
||||
Transformations = emqx_message_transformation:list(),
|
||||
?OK(lists:map(fun transformation_out/1, Transformations));
|
||||
'/message_transformations'(post, #{body := Params = #{<<"name">> := Name}}) ->
|
||||
with_transformation(
|
||||
Name,
|
||||
return(?BAD_REQUEST('ALREADY_EXISTS', <<"Transformation already exists">>)),
|
||||
fun() ->
|
||||
case emqx_message_transformation:insert(Params) of
|
||||
{ok, _} ->
|
||||
{ok, Res} = emqx_message_transformation:lookup(Name),
|
||||
{201, transformation_out(Res)};
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
end
|
||||
end
|
||||
);
|
||||
'/message_transformations'(put, #{body := Params = #{<<"name">> := Name}}) ->
|
||||
with_transformation(
|
||||
Name,
|
||||
fun() ->
|
||||
case emqx_message_transformation:update(Params) of
|
||||
{ok, _} ->
|
||||
{ok, Res} = emqx_message_transformation:lookup(Name),
|
||||
{200, transformation_out(Res)};
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
end
|
||||
end,
|
||||
not_found()
|
||||
).
|
||||
|
||||
'/message_transformations/transformation/:name'(get, #{bindings := #{name := Name}}) ->
|
||||
with_transformation(
|
||||
Name,
|
||||
fun(Transformation) -> ?OK(transformation_out(Transformation)) end,
|
||||
not_found()
|
||||
);
|
||||
'/message_transformations/transformation/:name'(delete, #{bindings := #{name := Name}}) ->
|
||||
with_transformation(
|
||||
Name,
|
||||
fun() ->
|
||||
case emqx_message_transformation:delete(Name) of
|
||||
{ok, _} ->
|
||||
?NO_CONTENT;
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
end
|
||||
end,
|
||||
not_found()
|
||||
).
|
||||
|
||||
'/message_transformations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
|
||||
do_reorder(Order).
|
||||
|
||||
'/message_transformations/transformation/:name/enable/:enable'(post, #{
|
||||
bindings := #{name := Name, enable := Enable}
|
||||
}) ->
|
||||
with_transformation(
|
||||
Name,
|
||||
fun(Transformation) -> do_enable_disable(Transformation, Enable) end,
|
||||
not_found()
|
||||
).
|
||||
|
||||
'/message_transformations/transformation/:name/metrics'(get, #{bindings := #{name := Name}}) ->
|
||||
with_transformation(
|
||||
Name,
|
||||
fun() ->
|
||||
Nodes = emqx:running_nodes(),
|
||||
Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, Name, 5_000),
|
||||
NodeResults = lists:zip(Nodes, Results),
|
||||
NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok],
|
||||
NodeErrors == [] orelse
|
||||
?SLOG(warning, #{
|
||||
msg => "rpc_get_transformation_metrics_errors",
|
||||
errors => NodeErrors
|
||||
}),
|
||||
NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults],
|
||||
Response = #{
|
||||
metrics => aggregate_metrics(NodeMetrics),
|
||||
node_metrics => NodeMetrics
|
||||
},
|
||||
?OK(Response)
|
||||
end,
|
||||
not_found()
|
||||
).
|
||||
|
||||
'/message_transformations/transformation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) ->
|
||||
with_transformation(
|
||||
Name,
|
||||
fun() ->
|
||||
Nodes = emqx:running_nodes(),
|
||||
Results = emqx_metrics_proto_v2:reset_metrics(Nodes, ?METRIC_NAME, Name, 5_000),
|
||||
NodeResults = lists:zip(Nodes, Results),
|
||||
NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok],
|
||||
NodeErrors == [] orelse
|
||||
?SLOG(warning, #{
|
||||
msg => "rpc_reset_transformation_metrics_errors",
|
||||
errors => NodeErrors
|
||||
}),
|
||||
?NO_CONTENT
|
||||
end,
|
||||
not_found()
|
||||
).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
ref(Struct) -> hoconsc:ref(?MODULE, Struct).
|
||||
mk(Type, Opts) -> hoconsc:mk(Type, Opts).
|
||||
array(Type) -> hoconsc:array(Type).
|
||||
|
||||
%% FIXME: all examples
|
||||
example_input_create() ->
|
||||
#{
|
||||
<<"sql_check">> =>
|
||||
#{
|
||||
summary => <<"Using a SQL check">>,
|
||||
value => example_transformation([example_sql_check()])
|
||||
},
|
||||
<<"avro_check">> =>
|
||||
#{
|
||||
summary => <<"Using an Avro schema check">>,
|
||||
value => example_transformation([example_avro_check()])
|
||||
}
|
||||
}.
|
||||
|
||||
example_input_update() ->
|
||||
#{
|
||||
<<"update">> =>
|
||||
#{
|
||||
summary => <<"Update">>,
|
||||
value => example_transformation([example_sql_check()])
|
||||
}
|
||||
}.
|
||||
|
||||
example_input_reorder() ->
|
||||
#{
|
||||
<<"reorder">> =>
|
||||
#{
|
||||
summary => <<"Update">>,
|
||||
value => #{
|
||||
order => [<<"bar">>, <<"foo">>, <<"baz">>]
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
example_return_list() ->
|
||||
OtherVal0 = example_transformation([example_avro_check()]),
|
||||
OtherVal = OtherVal0#{name => <<"other_transformation">>},
|
||||
#{
|
||||
<<"list">> =>
|
||||
#{
|
||||
summary => <<"List">>,
|
||||
value => [
|
||||
example_transformation([example_sql_check()]),
|
||||
OtherVal
|
||||
]
|
||||
}
|
||||
}.
|
||||
|
||||
example_return_create() ->
|
||||
example_input_create().
|
||||
|
||||
example_return_update() ->
|
||||
example_input_update().
|
||||
|
||||
example_return_lookup() ->
|
||||
example_input_create().
|
||||
|
||||
example_return_metrics() ->
|
||||
Metrics = #{
|
||||
matched => 2,
|
||||
succeeded => 1,
|
||||
failed => 1,
|
||||
rate => 1.23,
|
||||
rate_last5m => 0.88,
|
||||
rate_max => 1.87
|
||||
},
|
||||
#{
|
||||
<<"metrics">> =>
|
||||
#{
|
||||
summary => <<"Metrics">>,
|
||||
value => #{
|
||||
metrics => Metrics,
|
||||
node_metrics =>
|
||||
[
|
||||
#{
|
||||
node => <<"emqx@127.0.0.1">>,
|
||||
metrics => Metrics
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
example_transformation(Checks) ->
|
||||
#{
|
||||
name => <<"my_transformation">>,
|
||||
enable => true,
|
||||
description => <<"my transformation">>,
|
||||
tags => [<<"transformation">>],
|
||||
topics => [<<"t/+">>],
|
||||
strategy => <<"all_pass">>,
|
||||
failure_action => <<"drop">>,
|
||||
log_failure => #{<<"level">> => <<"info">>},
|
||||
checks => Checks
|
||||
}.
|
||||
|
||||
example_sql_check() ->
|
||||
#{
|
||||
type => <<"sql">>,
|
||||
sql => <<"select payload.temp as t where t > 10">>
|
||||
}.
|
||||
|
||||
example_avro_check() ->
|
||||
#{
|
||||
type => <<"avro">>,
|
||||
schema => <<"my_avro_schema">>
|
||||
}.
|
||||
|
||||
error_schema(Code, Message) ->
|
||||
error_schema(Code, Message, _ExtraFields = []).
|
||||
|
||||
error_schema(Code, Message, ExtraFields) when is_atom(Code) ->
|
||||
error_schema([Code], Message, ExtraFields);
|
||||
error_schema(Codes, Message, ExtraFields) when is_list(Message) ->
|
||||
error_schema(Codes, list_to_binary(Message), ExtraFields);
|
||||
error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(Message) ->
|
||||
ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
|
||||
|
||||
do_reorder(Order) ->
|
||||
case emqx_message_transformation:reorder(Order) of
|
||||
{ok, _} ->
|
||||
?NO_CONTENT;
|
||||
{error,
|
||||
{pre_config_update, _HandlerMod, #{
|
||||
not_found := NotFound,
|
||||
duplicated := Duplicated,
|
||||
not_reordered := NotReordered
|
||||
}}} ->
|
||||
Msg0 = ?ERROR_MSG('BAD_REQUEST', <<"Bad request">>),
|
||||
Msg = Msg0#{
|
||||
not_found => NotFound,
|
||||
duplicated => Duplicated,
|
||||
not_reordered => NotReordered
|
||||
},
|
||||
{400, Msg};
|
||||
{error, Error} ->
|
||||
?BAD_REQUEST(Error)
|
||||
end.
|
||||
|
||||
do_enable_disable(Transformation, Enable) ->
|
||||
RawTransformation = make_serializable(Transformation),
|
||||
case emqx_message_transformation:update(RawTransformation#{<<"enable">> => Enable}) of
|
||||
{ok, _} ->
|
||||
?NO_CONTENT;
|
||||
{error, Reason} ->
|
||||
?BAD_REQUEST(Reason)
|
||||
end.
|
||||
|
||||
with_transformation(Name, FoundFn, NotFoundFn) ->
|
||||
case emqx_message_transformation:lookup(Name) of
|
||||
{ok, Transformation} ->
|
||||
{arity, Arity} = erlang:fun_info(FoundFn, arity),
|
||||
case Arity of
|
||||
1 -> FoundFn(Transformation);
|
||||
0 -> FoundFn()
|
||||
end;
|
||||
{error, not_found} ->
|
||||
NotFoundFn()
|
||||
end.
|
||||
|
||||
return(Response) ->
|
||||
fun() -> Response end.
|
||||
|
||||
not_found() ->
|
||||
return(?NOT_FOUND(<<"Transformation not found">>)).
|
||||
|
||||
make_serializable(Transformation0) ->
|
||||
Schema = emqx_message_transformation_schema,
|
||||
Transformation1 = transformation_out(Transformation0),
|
||||
Transformation = emqx_utils_maps:binary_key_map(Transformation1),
|
||||
RawConfig = #{
|
||||
<<"message_transformation">> => #{
|
||||
<<"transformations">> =>
|
||||
[Transformation]
|
||||
}
|
||||
},
|
||||
#{
|
||||
<<"message_transformation">> := #{
|
||||
<<"transformations">> :=
|
||||
[Serialized]
|
||||
}
|
||||
} =
|
||||
hocon_tconf:make_serializable(Schema, RawConfig, #{}),
|
||||
Serialized.
|
||||
|
||||
format_metrics(Node, #{
|
||||
counters := #{
|
||||
'matched' := Matched,
|
||||
'succeeded' := Succeeded,
|
||||
'failed' := Failed
|
||||
},
|
||||
rate := #{
|
||||
'matched' := #{
|
||||
current := MatchedRate,
|
||||
last5m := Matched5mRate,
|
||||
max := MatchedMaxRate
|
||||
}
|
||||
}
|
||||
}) ->
|
||||
#{
|
||||
metrics => #{
|
||||
'matched' => Matched,
|
||||
'succeeded' => Succeeded,
|
||||
'failed' => Failed,
|
||||
rate => MatchedRate,
|
||||
rate_last5m => Matched5mRate,
|
||||
rate_max => MatchedMaxRate
|
||||
},
|
||||
node => Node
|
||||
};
|
||||
format_metrics(Node, _) ->
|
||||
#{
|
||||
metrics => #{
|
||||
'matched' => 0,
|
||||
'succeeded' => 0,
|
||||
'failed' => 0,
|
||||
rate => 0,
|
||||
rate_last5m => 0,
|
||||
rate_max => 0
|
||||
},
|
||||
node => Node
|
||||
}.
|
||||
|
||||
aggregate_metrics(NodeMetrics) ->
|
||||
ErrorLogger = fun(_) -> ok end,
|
||||
lists:foldl(
|
||||
fun(#{metrics := Metrics}, Acc) ->
|
||||
emqx_utils_maps:best_effort_recursive_sum(Metrics, Acc, ErrorLogger)
|
||||
end,
|
||||
#{},
|
||||
NodeMetrics
|
||||
).
|
||||
|
||||
transformation_out(Transformation) ->
|
||||
maps:update_with(
|
||||
operations,
|
||||
fun(Os) -> lists:map(fun operation_out/1, Os) end,
|
||||
Transformation
|
||||
).
|
||||
|
||||
operation_out(Operation0) ->
|
||||
%% TODO: remove injected bif module
|
||||
Operation = maps:update_with(
|
||||
value,
|
||||
fun(V) -> iolist_to_binary(emqx_variform:decompile(V)) end,
|
||||
Operation0
|
||||
),
|
||||
maps:update_with(
|
||||
key,
|
||||
fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
|
||||
Operation
|
||||
).
|
|
@ -0,0 +1,280 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_registry).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([
|
||||
lookup/1,
|
||||
insert/2,
|
||||
update/3,
|
||||
delete/2,
|
||||
reindex_positions/2,
|
||||
|
||||
matching_transformations/1,
|
||||
|
||||
%% metrics
|
||||
get_metrics/1,
|
||||
inc_matched/1,
|
||||
inc_succeeded/1,
|
||||
inc_failed/1,
|
||||
|
||||
start_link/0,
|
||||
metrics_worker_spec/0
|
||||
]).
|
||||
|
||||
%% `gen_server' API
|
||||
-export([
|
||||
init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2
|
||||
]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-define(TRANSFORMATION_TOPIC_INDEX, emqx_message_transformation_index).
|
||||
-define(TRANSFORMATION_TAB, emqx_message_transformation_tab).
|
||||
|
||||
-define(METRIC_NAME, message_transformation).
|
||||
-define(METRICS, [
|
||||
'matched',
|
||||
'succeeded',
|
||||
'failed'
|
||||
]).
|
||||
-define(RATE_METRICS, ['matched']).
|
||||
|
||||
-type transformation_name() :: binary().
|
||||
%% TODO
|
||||
-type transformation() :: #{atom() => term()}.
|
||||
-type position_index() :: pos_integer().
|
||||
|
||||
-record(reindex_positions, {
|
||||
new_transformations :: [transformation()],
|
||||
old_transformations :: [transformation()]
|
||||
}).
|
||||
-record(insert, {pos :: position_index(), transformation :: transformation()}).
|
||||
-record(update, {old :: transformation(), pos :: position_index(), new :: transformation()}).
|
||||
-record(delete, {transformation :: transformation(), pos :: position_index()}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec start_link() -> gen_server:start_ret().
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec lookup(transformation_name()) ->
|
||||
{ok, transformation()} | {error, not_found}.
|
||||
lookup(Name) ->
|
||||
case emqx_utils_ets:lookup_value(?TRANSFORMATION_TAB, Name, undefined) of
|
||||
undefined ->
|
||||
{error, not_found};
|
||||
Transformation ->
|
||||
{ok, Transformation}
|
||||
end.
|
||||
|
||||
-spec reindex_positions([transformation()], [transformation()]) -> ok.
|
||||
reindex_positions(NewTransformations, OldTransformations) ->
|
||||
gen_server:call(
|
||||
?MODULE,
|
||||
#reindex_positions{
|
||||
new_transformations = NewTransformations,
|
||||
old_transformations = OldTransformations
|
||||
},
|
||||
infinity
|
||||
).
|
||||
|
||||
-spec insert(position_index(), transformation()) -> ok.
|
||||
insert(Pos, Transformation) ->
|
||||
gen_server:call(?MODULE, #insert{pos = Pos, transformation = Transformation}, infinity).
|
||||
|
||||
-spec update(transformation(), position_index(), transformation()) -> ok.
|
||||
update(Old, Pos, New) ->
|
||||
gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity).
|
||||
|
||||
-spec delete(transformation(), position_index()) -> ok.
|
||||
delete(Transformation, Pos) ->
|
||||
gen_server:call(?MODULE, #delete{transformation = Transformation, pos = Pos}, infinity).
|
||||
|
||||
%% @doc Returns a list of matching transformation names, sorted by their configuration order.
|
||||
-spec matching_transformations(emqx_types:topic()) -> [transformation()].
|
||||
matching_transformations(Topic) ->
|
||||
Transformations0 =
|
||||
lists:flatmap(
|
||||
fun(M) ->
|
||||
case emqx_topic_index:get_record(M, ?TRANSFORMATION_TOPIC_INDEX) of
|
||||
[Name] ->
|
||||
[Name];
|
||||
_ ->
|
||||
[]
|
||||
end
|
||||
end,
|
||||
emqx_topic_index:matches(Topic, ?TRANSFORMATION_TOPIC_INDEX, [unique])
|
||||
),
|
||||
lists:flatmap(
|
||||
fun(Name) ->
|
||||
case lookup(Name) of
|
||||
{ok, Transformation} ->
|
||||
[Transformation];
|
||||
_ ->
|
||||
[]
|
||||
end
|
||||
end,
|
||||
Transformations0
|
||||
).
|
||||
|
||||
-spec metrics_worker_spec() -> supervisor:child_spec().
|
||||
metrics_worker_spec() ->
|
||||
emqx_metrics_worker:child_spec(message_transformation_metrics, ?METRIC_NAME).
|
||||
|
||||
-spec get_metrics(transformation_name()) -> emqx_metrics_worker:metrics().
|
||||
get_metrics(Name) ->
|
||||
emqx_metrics_worker:get_metrics(?METRIC_NAME, Name).
|
||||
|
||||
-spec inc_matched(transformation_name()) -> ok.
|
||||
inc_matched(Name) ->
|
||||
emqx_metrics_worker:inc(?METRIC_NAME, Name, 'matched').
|
||||
|
||||
-spec inc_succeeded(transformation_name()) -> ok.
|
||||
inc_succeeded(Name) ->
|
||||
emqx_metrics_worker:inc(?METRIC_NAME, Name, 'succeeded').
|
||||
|
||||
-spec inc_failed(transformation_name()) -> ok.
|
||||
inc_failed(Name) ->
|
||||
emqx_metrics_worker:inc(?METRIC_NAME, Name, 'failed').
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `gen_server' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init(_) ->
|
||||
create_tables(),
|
||||
State = #{},
|
||||
{ok, State}.
|
||||
|
||||
handle_call(
|
||||
#reindex_positions{
|
||||
new_transformations = NewTransformations,
|
||||
old_transformations = OldTransformations
|
||||
},
|
||||
_From,
|
||||
State
|
||||
) ->
|
||||
do_reindex_positions(NewTransformations, OldTransformations),
|
||||
{reply, ok, State};
|
||||
handle_call(#insert{pos = Pos, transformation = Transformation}, _From, State) ->
|
||||
do_insert(Pos, Transformation),
|
||||
{reply, ok, State};
|
||||
handle_call(#update{old = OldTransformation, pos = Pos, new = NewTransformation}, _From, State) ->
|
||||
ok = do_update(OldTransformation, Pos, NewTransformation),
|
||||
{reply, ok, State};
|
||||
handle_call(#delete{transformation = Transformation, pos = Pos}, _From, State) ->
|
||||
do_delete(Transformation, Pos),
|
||||
{reply, ok, State};
|
||||
handle_call(_Call, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(_Cast, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
create_tables() ->
|
||||
_ = emqx_utils_ets:new(?TRANSFORMATION_TOPIC_INDEX, [
|
||||
public, ordered_set, {read_concurrency, true}
|
||||
]),
|
||||
_ = emqx_utils_ets:new(?TRANSFORMATION_TAB, [public, ordered_set, {read_concurrency, true}]),
|
||||
ok.
|
||||
|
||||
do_reindex_positions(NewTransformations, OldTransformations) ->
|
||||
lists:foreach(
|
||||
fun({Pos, Transformation}) ->
|
||||
#{topics := Topics} = Transformation,
|
||||
delete_topic_index(Pos, Topics)
|
||||
end,
|
||||
lists:enumerate(OldTransformations)
|
||||
),
|
||||
lists:foreach(
|
||||
fun({Pos, Transformation}) ->
|
||||
#{
|
||||
name := Name,
|
||||
topics := Topics
|
||||
} = Transformation,
|
||||
do_insert_into_tab(Name, Transformation, Pos),
|
||||
upsert_topic_index(Name, Pos, Topics)
|
||||
end,
|
||||
lists:enumerate(NewTransformations)
|
||||
).
|
||||
|
||||
do_insert(Pos, Transformation) ->
|
||||
#{
|
||||
enable := Enabled,
|
||||
name := Name,
|
||||
topics := Topics
|
||||
} = Transformation,
|
||||
maybe_create_metrics(Name),
|
||||
do_insert_into_tab(Name, Transformation, Pos),
|
||||
Enabled andalso upsert_topic_index(Name, Pos, Topics),
|
||||
ok.
|
||||
|
||||
do_update(OldTransformation, Pos, NewTransformation) ->
|
||||
#{topics := OldTopics} = OldTransformation,
|
||||
#{
|
||||
enable := Enabled,
|
||||
name := Name,
|
||||
topics := NewTopics
|
||||
} = NewTransformation,
|
||||
maybe_create_metrics(Name),
|
||||
do_insert_into_tab(Name, NewTransformation, Pos),
|
||||
delete_topic_index(Pos, OldTopics),
|
||||
Enabled andalso upsert_topic_index(Name, Pos, NewTopics),
|
||||
ok.
|
||||
|
||||
do_delete(Transformation, Pos) ->
|
||||
#{
|
||||
name := Name,
|
||||
topics := Topics
|
||||
} = Transformation,
|
||||
ets:delete(?TRANSFORMATION_TAB, Name),
|
||||
delete_topic_index(Pos, Topics),
|
||||
drop_metrics(Name),
|
||||
ok.
|
||||
|
||||
do_insert_into_tab(Name, Transformation0, Pos) ->
|
||||
Transformation = Transformation0#{pos => Pos},
|
||||
ets:insert(?TRANSFORMATION_TAB, {Name, Transformation}),
|
||||
ok.
|
||||
|
||||
maybe_create_metrics(Name) ->
|
||||
case emqx_metrics_worker:has_metrics(?METRIC_NAME, Name) of
|
||||
true ->
|
||||
ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, Name);
|
||||
false ->
|
||||
ok = emqx_metrics_worker:create_metrics(?METRIC_NAME, Name, ?METRICS, ?RATE_METRICS)
|
||||
end.
|
||||
|
||||
drop_metrics(Name) ->
|
||||
ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name).
|
||||
|
||||
upsert_topic_index(Name, Pos, Topics) ->
|
||||
lists:foreach(
|
||||
fun(Topic) ->
|
||||
true = emqx_topic_index:insert(Topic, Pos, Name, ?TRANSFORMATION_TOPIC_INDEX)
|
||||
end,
|
||||
Topics
|
||||
).
|
||||
|
||||
delete_topic_index(Pos, Topics) ->
|
||||
lists:foreach(
|
||||
fun(Topic) ->
|
||||
true = emqx_topic_index:delete(Topic, Pos, ?TRANSFORMATION_TOPIC_INDEX)
|
||||
end,
|
||||
Topics
|
||||
).
|
|
@ -0,0 +1,331 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_schema).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
%% `hocon_schema' API
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1
|
||||
]).
|
||||
|
||||
%% `minirest_trails' API
|
||||
-export([
|
||||
api_schema/1
|
||||
]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-define(BIF_MOD_STR, "emqx_message_transformation_bif").
|
||||
|
||||
-define(ALLOWED_ROOT_KEYS, [
|
||||
<<"payload">>,
|
||||
<<"qos">>,
|
||||
<<"retain">>,
|
||||
<<"topic">>,
|
||||
<<"user_property">>
|
||||
]).
|
||||
|
||||
-type key() :: list(binary()) | binary().
|
||||
-reflect_type([key/0]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `hocon_schema' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> message_transformation.
|
||||
|
||||
roots() ->
|
||||
[
|
||||
{message_transformation,
|
||||
mk(ref(message_transformation), #{importance => ?IMPORTANCE_HIDDEN})}
|
||||
].
|
||||
|
||||
fields(message_transformation) ->
|
||||
[
|
||||
{transformations,
|
||||
mk(
|
||||
hoconsc:array(ref(transformation)),
|
||||
#{
|
||||
default => [],
|
||||
desc => ?DESC("transformations"),
|
||||
validator => fun validate_unique_names/1
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(transformation) ->
|
||||
[
|
||||
{tags, emqx_schema:tags_schema()},
|
||||
{description, emqx_schema:description_schema()},
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{name,
|
||||
mk(
|
||||
binary(),
|
||||
#{
|
||||
required => true,
|
||||
validator => fun emqx_resource:validate_name/1,
|
||||
desc => ?DESC("name")
|
||||
}
|
||||
)},
|
||||
{topics,
|
||||
mk(
|
||||
hoconsc:union([binary(), hoconsc:array(binary())]),
|
||||
#{
|
||||
desc => ?DESC("topics"),
|
||||
converter => fun ensure_array/2,
|
||||
validator => fun validate_unique_topics/1,
|
||||
required => true
|
||||
}
|
||||
)},
|
||||
{failure_action,
|
||||
mk(
|
||||
hoconsc:enum([drop, disconnect, ignore]),
|
||||
#{desc => ?DESC("failure_action"), required => true}
|
||||
)},
|
||||
{log_failure,
|
||||
mk(
|
||||
ref(log_failure),
|
||||
#{desc => ?DESC("log_failure_at"), default => #{}}
|
||||
)},
|
||||
{payload_decoder,
|
||||
mk(
|
||||
hoconsc:union(fun payload_serde_member_selector/1),
|
||||
#{desc => ?DESC("payload_decoder"), default => #{<<"type">> => <<"none">>}}
|
||||
)},
|
||||
{payload_encoder,
|
||||
mk(
|
||||
hoconsc:union(fun payload_serde_member_selector/1),
|
||||
#{desc => ?DESC("payload_encoder"), default => #{<<"type">> => <<"none">>}}
|
||||
)},
|
||||
{operations,
|
||||
mk(
|
||||
hoconsc:array(ref(operation)),
|
||||
#{
|
||||
desc => ?DESC("operation"),
|
||||
required => true,
|
||||
validator => fun validate_operations/1
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(log_failure) ->
|
||||
[
|
||||
{level,
|
||||
mk(
|
||||
hoconsc:enum([error, warning, notice, info, debug, none]),
|
||||
#{desc => ?DESC("log_failure_at"), default => info}
|
||||
)}
|
||||
];
|
||||
fields(operation) ->
|
||||
[
|
||||
%% TODO: more strict type check??
|
||||
{key,
|
||||
mk(
|
||||
typerefl:alias("string", key()), #{
|
||||
desc => ?DESC("operation_key"),
|
||||
required => true,
|
||||
converter => fun parse_key_path/2
|
||||
}
|
||||
)},
|
||||
{value,
|
||||
mk(typerefl:alias("string", any()), #{
|
||||
desc => ?DESC("operation_value"),
|
||||
required => true,
|
||||
converter => fun compile_variform/2
|
||||
})}
|
||||
];
|
||||
fields(payload_serde_none) ->
|
||||
[{type, mk(none, #{default => none, desc => ?DESC("payload_serde_none_type")})}];
|
||||
fields(payload_serde_json) ->
|
||||
[{type, mk(json, #{default => json, desc => ?DESC("payload_serde_json_type")})}];
|
||||
fields(payload_serde_avro) ->
|
||||
[
|
||||
{type, mk(avro, #{default => avro, desc => ?DESC("payload_serde_avro_type")})},
|
||||
{schema, mk(binary(), #{required => true, desc => ?DESC("payload_serde_avro_schema")})}
|
||||
];
|
||||
fields(payload_serde_protobuf) ->
|
||||
[
|
||||
{type, mk(protobuf, #{default => protobuf, desc => ?DESC("payload_serde_protobuf_type")})},
|
||||
{schema, mk(binary(), #{required => true, desc => ?DESC("payload_serde_protobuf_schema")})},
|
||||
{message_type,
|
||||
mk(binary(), #{required => true, desc => ?DESC("payload_serde_protobuf_message_type")})}
|
||||
].
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `minirest_trails' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
api_schema(list) ->
|
||||
hoconsc:array(ref(transformation));
|
||||
api_schema(lookup) ->
|
||||
ref(transformation);
|
||||
api_schema(post) ->
|
||||
ref(transformation);
|
||||
api_schema(put) ->
|
||||
ref(transformation).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal exports
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
|
||||
ref(Name) -> hoconsc:ref(?MODULE, Name).
|
||||
|
||||
payload_serde_member_selector(all_union_members) ->
|
||||
payload_serde_refs();
|
||||
payload_serde_member_selector({value, V}) ->
|
||||
payload_serde_refs(V).
|
||||
|
||||
payload_serde_refs() ->
|
||||
[
|
||||
payload_serde_none,
|
||||
payload_serde_json,
|
||||
payload_serde_avro,
|
||||
payload_serde_protobuf
|
||||
].
|
||||
payload_serde_refs(#{<<"type">> := Type} = V) when is_atom(Type) ->
|
||||
payload_serde_refs(V#{<<"type">> := atom_to_binary(Type)});
|
||||
payload_serde_refs(#{<<"type">> := <<"none">>}) ->
|
||||
[ref(payload_serde_none)];
|
||||
payload_serde_refs(#{<<"type">> := <<"json">>}) ->
|
||||
[ref(payload_serde_json)];
|
||||
payload_serde_refs(#{<<"type">> := <<"avro">>}) ->
|
||||
[ref(payload_serde_avro)];
|
||||
payload_serde_refs(#{<<"type">> := <<"protobuf">>}) ->
|
||||
[ref(payload_serde_protobuf)];
|
||||
payload_serde_refs(_Value) ->
|
||||
Expected = lists:join(
|
||||
" | ",
|
||||
[
|
||||
Name
|
||||
|| T <- payload_serde_refs(),
|
||||
"payload_serde_" ++ Name <- [atom_to_list(T)]
|
||||
]
|
||||
),
|
||||
throw(#{
|
||||
field_name => type,
|
||||
expected => iolist_to_binary(Expected)
|
||||
}).
|
||||
|
||||
ensure_array(undefined, _) -> undefined;
|
||||
ensure_array(L, _) when is_list(L) -> L;
|
||||
ensure_array(B, _) -> [B].
|
||||
|
||||
validate_unique_names(Transformations0) ->
|
||||
Transformations = emqx_utils_maps:binary_key_map(Transformations0),
|
||||
do_validate_unique_names(Transformations, #{}).
|
||||
|
||||
do_validate_unique_names(_Transformations = [], _Acc) ->
|
||||
ok;
|
||||
do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(Name, Acc) ->
|
||||
{error, <<"duplicated name: ", Name/binary>>};
|
||||
do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
|
||||
do_validate_unique_names(Rest, Acc#{Name => true}).
|
||||
|
||||
validate_unique_topics(Topics) ->
|
||||
Grouped = maps:groups_from_list(
|
||||
fun(T) -> T end,
|
||||
Topics
|
||||
),
|
||||
DuplicatedMap = maps:filter(
|
||||
fun(_T, Ts) -> length(Ts) > 1 end,
|
||||
Grouped
|
||||
),
|
||||
case maps:keys(DuplicatedMap) of
|
||||
[] ->
|
||||
ok;
|
||||
Duplicated ->
|
||||
Msg = iolist_to_binary([
|
||||
<<"duplicated topics: ">>,
|
||||
lists:join(", ", Duplicated)
|
||||
]),
|
||||
{error, Msg}
|
||||
end.
|
||||
|
||||
validate_operations([]) ->
|
||||
{error, <<"at least one operation must be defined">>};
|
||||
validate_operations([_ | _]) ->
|
||||
ok.
|
||||
|
||||
compile_variform(Expression, #{make_serializable := true}) ->
|
||||
case is_binary(Expression) of
|
||||
true ->
|
||||
Expression;
|
||||
false ->
|
||||
emqx_variform:decompile(Expression)
|
||||
end;
|
||||
compile_variform(Expression, _Opts) ->
|
||||
case emqx_variform:compile(Expression) of
|
||||
{ok, Compiled} ->
|
||||
transform_bifs(Compiled);
|
||||
{error, Reason} ->
|
||||
throw(#{expression => Expression, reason => Reason})
|
||||
end.
|
||||
|
||||
transform_bifs(#{form := Form} = Compiled) ->
|
||||
Compiled#{form := traverse_transform_bifs(Form)}.
|
||||
|
||||
traverse_transform_bifs({call, FnName, Args}) ->
|
||||
FQFnName = fully_qualify_local_bif(FnName),
|
||||
{call, FQFnName, lists:map(fun traverse_transform_bifs/1, Args)};
|
||||
traverse_transform_bifs({array, Elems}) ->
|
||||
{array, lists:map(fun traverse_transform_bifs/1, Elems)};
|
||||
traverse_transform_bifs(Node) ->
|
||||
Node.
|
||||
|
||||
fully_qualify_local_bif("json_encode") ->
|
||||
?BIF_MOD_STR ++ ".json_encode";
|
||||
fully_qualify_local_bif("json_decode") ->
|
||||
?BIF_MOD_STR ++ ".json_decode";
|
||||
fully_qualify_local_bif(FnName) ->
|
||||
FnName.
|
||||
|
||||
parse_key_path(<<"">>, _Opts) ->
|
||||
throw(#{reason => <<"key must be non-empty">>});
|
||||
parse_key_path(Key, #{make_serializable := true}) ->
|
||||
case is_binary(Key) of
|
||||
true ->
|
||||
Key;
|
||||
false ->
|
||||
iolist_to_binary(lists:join(".", Key))
|
||||
end;
|
||||
parse_key_path(Key, _Opts) when is_binary(Key) ->
|
||||
Parts = binary:split(Key, <<".">>, [global]),
|
||||
case lists:any(fun(P) -> P =:= <<"">> end, Parts) of
|
||||
true ->
|
||||
throw(#{invalid_key => Key});
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
case Parts of
|
||||
[<<"payload">> | _] ->
|
||||
ok;
|
||||
[<<"qos">>] ->
|
||||
ok;
|
||||
[<<"retain">>] ->
|
||||
ok;
|
||||
[<<"topic">>] ->
|
||||
ok;
|
||||
[<<"user_property">>, _] ->
|
||||
ok;
|
||||
[<<"user_property">>] ->
|
||||
throw(#{
|
||||
invalid_key => Key, reason => <<"must define exactly one key inside user property">>
|
||||
});
|
||||
[<<"user_property">> | _] ->
|
||||
throw(#{
|
||||
invalid_key => Key, reason => <<"must define exactly one key inside user property">>
|
||||
});
|
||||
_ ->
|
||||
throw(#{invalid_key => Key, allowed_root_keys => ?ALLOWED_ROOT_KEYS})
|
||||
end,
|
||||
Parts.
|
|
@ -0,0 +1,47 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
|
||||
%% `supervisor' API
|
||||
-export([init/1]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `supervisor' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
Registry = worker_spec(emqx_message_transformation_registry),
|
||||
Metrics = emqx_message_transformation_registry:metrics_worker_spec(),
|
||||
SupFlags = #{
|
||||
strategy => one_for_one,
|
||||
intensity => 10,
|
||||
period => 10
|
||||
},
|
||||
ChildSpecs = [Metrics, Registry],
|
||||
{ok, {SupFlags, ChildSpecs}}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
worker_spec(Mod) ->
|
||||
#{
|
||||
id => Mod,
|
||||
start => {Mod, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 5_000,
|
||||
type => worker
|
||||
}.
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,174 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation_tests).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(TRANSFORMATIONS_PATH, "message_transformation.transformations").
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
bin(X) -> emqx_utils_conv:bin(X).
|
||||
|
||||
parse_and_check(InnerConfigs) ->
|
||||
RootBin = <<"message_transformation">>,
|
||||
InnerBin = <<"transformations">>,
|
||||
RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
|
||||
#{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
|
||||
emqx_message_transformation_schema,
|
||||
RawConf,
|
||||
#{
|
||||
required => false,
|
||||
atom_key => false,
|
||||
make_serializable => false
|
||||
}
|
||||
),
|
||||
Checked.
|
||||
|
||||
transformation(Name, Operations) ->
|
||||
transformation(Name, Operations, _Overrides = #{}).
|
||||
|
||||
transformation(Name, Operations0, Overrides) ->
|
||||
Operations = lists:map(fun normalize_operation/1, Operations0),
|
||||
Default = #{
|
||||
<<"tags">> => [<<"some">>, <<"tags">>],
|
||||
<<"description">> => <<"my transformation">>,
|
||||
<<"enable">> => true,
|
||||
<<"name">> => Name,
|
||||
<<"topics">> => [<<"t/+">>],
|
||||
<<"failure_action">> => <<"drop">>,
|
||||
<<"log_failure">> => #{<<"level">> => <<"warning">>},
|
||||
<<"payload_decoder">> => #{<<"type">> => <<"json">>},
|
||||
<<"payload_encoder">> => #{<<"type">> => <<"json">>},
|
||||
<<"operations">> => Operations
|
||||
},
|
||||
emqx_utils_maps:deep_merge(Default, Overrides).
|
||||
|
||||
normalize_operation({K, V}) ->
|
||||
#{<<"key">> => bin(K), <<"value">> => bin(V)}.
|
||||
|
||||
dummy_operation() ->
|
||||
topic_operation(<<"concat([topic, '/', payload.t])">>).
|
||||
|
||||
topic_operation(VariformExpr) ->
|
||||
operation(topic, VariformExpr).
|
||||
|
||||
operation(Key, VariformExpr) ->
|
||||
{Key, VariformExpr}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
schema_test_() ->
|
||||
[
|
||||
{"topics is always a list 1",
|
||||
?_assertMatch(
|
||||
[#{<<"topics">> := [<<"t/1">>]}],
|
||||
parse_and_check([
|
||||
transformation(
|
||||
<<"foo">>,
|
||||
[dummy_operation()],
|
||||
#{<<"topics">> => <<"t/1">>}
|
||||
)
|
||||
])
|
||||
)},
|
||||
{"topics is always a list 2",
|
||||
?_assertMatch(
|
||||
[#{<<"topics">> := [<<"t/1">>]}],
|
||||
parse_and_check([
|
||||
transformation(
|
||||
<<"foo">>,
|
||||
[dummy_operation()],
|
||||
#{<<"topics">> => [<<"t/1">>]}
|
||||
)
|
||||
])
|
||||
)},
|
||||
{"names are unique",
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
reason := <<"duplicated name:", _/binary>>,
|
||||
path := ?TRANSFORMATIONS_PATH,
|
||||
kind := validation_error
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
transformation(<<"foo">>, [dummy_operation()]),
|
||||
transformation(<<"foo">>, [dummy_operation()])
|
||||
])
|
||||
)},
|
||||
{"operations must be non-empty",
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
reason := <<"at least one operation must be defined">>,
|
||||
kind := validation_error
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
transformation(
|
||||
<<"foo">>,
|
||||
[]
|
||||
)
|
||||
])
|
||||
)},
|
||||
{"bogus check type: decoder",
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
expected := <<"none", _/binary>>,
|
||||
kind := validation_error,
|
||||
field_name := type
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
transformation(<<"foo">>, [dummy_operation()], #{
|
||||
<<"payload_decoder">> => #{<<"type">> => <<"foo">>}
|
||||
})
|
||||
])
|
||||
)},
|
||||
{"bogus check type: encoder",
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
expected := <<"none", _/binary>>,
|
||||
kind := validation_error,
|
||||
field_name := type
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
transformation(<<"foo">>, [dummy_operation()], #{
|
||||
<<"payload_encoder">> => #{<<"type">> => <<"foo">>}
|
||||
})
|
||||
])
|
||||
)}
|
||||
].
|
||||
|
||||
invalid_names_test_() ->
|
||||
[
|
||||
{InvalidName,
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
kind := validation_error,
|
||||
path := "message_transformation.transformations.1.name"
|
||||
}
|
||||
]},
|
||||
parse_and_check([transformation(InvalidName, [dummy_operation()])])
|
||||
)}
|
||||
|| InvalidName <- [
|
||||
<<"">>,
|
||||
<<"_name">>,
|
||||
<<"name$">>,
|
||||
<<"name!">>,
|
||||
<<"some name">>,
|
||||
<<"nãme"/utf8>>,
|
||||
<<"test_哈哈"/utf8>>,
|
||||
%% long name
|
||||
binary:copy(<<"a">>, 256)
|
||||
]
|
||||
].
|
|
@ -24,10 +24,13 @@
|
|||
-define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration).
|
||||
-define(PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, '/prometheus/schema_validation').
|
||||
-define(PROMETHEUS_SCHEMA_VALIDATION_COLLECTOR, emqx_prometheus_schema_validation).
|
||||
-define(PROMETHEUS_MESSAGE_TRANSFORMATION_REGISTRY, '/prometheus/message_transformation').
|
||||
-define(PROMETHEUS_MESSAGE_TRANSFORMATION_COLLECTOR, emqx_prometheus_message_transformation).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
-define(PROMETHEUS_EE_REGISTRIES, [
|
||||
?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY
|
||||
?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY,
|
||||
?PROMETHEUS_MESSAGE_TRANSFORMATION_REGISTRY
|
||||
]).
|
||||
%% ELSE if(?EMQX_RELEASE_EDITION == ee).
|
||||
-else.
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_prometheus, [
|
||||
{description, "Prometheus for EMQX"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.2.0"},
|
||||
{vsn, "5.2.1"},
|
||||
{modules, []},
|
||||
{registered, [emqx_prometheus_sup]},
|
||||
{applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]},
|
||||
|
|
|
@ -49,7 +49,8 @@
|
|||
stats/2,
|
||||
auth/2,
|
||||
data_integration/2,
|
||||
schema_validation/2
|
||||
schema_validation/2,
|
||||
message_transformation/2
|
||||
]).
|
||||
|
||||
-export([lookup_from_local_nodes/3]).
|
||||
|
@ -73,7 +74,10 @@ paths() ->
|
|||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
paths_ee() ->
|
||||
["/prometheus/schema_validation"].
|
||||
[
|
||||
"/prometheus/schema_validation",
|
||||
"/prometheus/message_transformation"
|
||||
].
|
||||
%% ELSE if(?EMQX_RELEASE_EDITION == ee).
|
||||
-else.
|
||||
paths_ee() ->
|
||||
|
@ -151,6 +155,19 @@ schema("/prometheus/schema_validation") ->
|
|||
responses =>
|
||||
#{200 => prometheus_data_schema()}
|
||||
}
|
||||
};
|
||||
schema("/prometheus/message_transformation") ->
|
||||
#{
|
||||
'operationId' => message_transformation,
|
||||
get =>
|
||||
#{
|
||||
description => ?DESC(get_prom_message_transformation),
|
||||
tags => ?TAGS,
|
||||
parameters => [ref(mode)],
|
||||
security => security(),
|
||||
responses =>
|
||||
#{200 => prometheus_data_schema()}
|
||||
}
|
||||
}.
|
||||
|
||||
security() ->
|
||||
|
@ -226,6 +243,9 @@ data_integration(get, #{headers := Headers, query_string := Qs}) ->
|
|||
schema_validation(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus_schema_validation, collect_opts(Headers, Qs)).
|
||||
|
||||
message_transformation(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus_message_transformation, collect_opts(Headers, Qs)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,222 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_prometheus_message_transformation).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
%% for bpapi
|
||||
-behaviour(emqx_prometheus_cluster).
|
||||
|
||||
%% Please don't remove this attribute, prometheus uses it to
|
||||
%% automatically register collectors.
|
||||
-behaviour(prometheus_collector).
|
||||
|
||||
-include("emqx_prometheus.hrl").
|
||||
-include_lib("prometheus/include/prometheus.hrl").
|
||||
|
||||
-import(
|
||||
prometheus_model_helpers,
|
||||
[
|
||||
create_mf/5,
|
||||
gauge_metrics/1,
|
||||
counter_metrics/1
|
||||
]
|
||||
).
|
||||
|
||||
-export([
|
||||
deregister_cleanup/1,
|
||||
collect_mf/2,
|
||||
collect_metrics/2
|
||||
]).
|
||||
|
||||
%% `emqx_prometheus' API
|
||||
-export([collect/1]).
|
||||
|
||||
%% `emqx_prometheus_cluster' API
|
||||
-export([
|
||||
fetch_from_local_node/1,
|
||||
fetch_cluster_consistented_data/0,
|
||||
aggre_or_zip_init_acc/0,
|
||||
logic_sum_metrics/0
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Type definitions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(MG(K, MAP), maps:get(K, MAP)).
|
||||
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
||||
|
||||
-define(metrics_data_key, message_transformation_metrics_data).
|
||||
|
||||
-define(key_enabled, emqx_message_transformation_enable).
|
||||
-define(key_matched, emqx_message_transformation_matched).
|
||||
-define(key_failed, emqx_message_transformation_failed).
|
||||
-define(key_succeeded, emqx_message_transformation_succeeded).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% `emqx_prometheus_cluster' API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
fetch_from_local_node(Mode) ->
|
||||
Validations = emqx_message_transformation:list(),
|
||||
{node(), #{
|
||||
?metrics_data_key => to_validation_data(Mode, Validations)
|
||||
}}.
|
||||
|
||||
fetch_cluster_consistented_data() ->
|
||||
#{}.
|
||||
|
||||
aggre_or_zip_init_acc() ->
|
||||
#{
|
||||
?metrics_data_key => maps:from_keys(message_transformation_metric(names), [])
|
||||
}.
|
||||
|
||||
logic_sum_metrics() ->
|
||||
[
|
||||
?key_enabled
|
||||
].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collector API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @private
|
||||
deregister_cleanup(_) -> ok.
|
||||
|
||||
%% @private
|
||||
-spec collect_mf(_Registry, Callback) -> ok when
|
||||
_Registry :: prometheus_registry:registry(),
|
||||
Callback :: prometheus_collector:collect_mf_callback().
|
||||
collect_mf(?PROMETHEUS_MESSAGE_TRANSFORMATION_REGISTRY, Callback) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
|
||||
%% Schema Validation Metrics
|
||||
RuleMetricDs = ?MG(?metrics_data_key, RawData),
|
||||
ok = add_collect_family(Callback, message_transformation_metrics_meta(), RuleMetricDs),
|
||||
|
||||
ok;
|
||||
collect_mf(_, _) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
collect(<<"json">>) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
#{
|
||||
message_transformations => collect_json_data(?MG(?metrics_data_key, RawData))
|
||||
};
|
||||
collect(<<"prometheus">>) ->
|
||||
prometheus_text_format:format(?PROMETHEUS_MESSAGE_TRANSFORMATION_REGISTRY).
|
||||
|
||||
%%====================
|
||||
%% API Helpers
|
||||
|
||||
add_collect_family(Callback, MetricWithType, Data) ->
|
||||
_ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType],
|
||||
ok.
|
||||
|
||||
add_collect_family(Name, Data, Callback, Type) ->
|
||||
%% TODO: help document from Name
|
||||
Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
|
||||
|
||||
collect_metrics(Name, Metrics) ->
|
||||
collect_mv(Name, Metrics).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collector
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%========================================
|
||||
%% Schema Validation Metrics
|
||||
%%========================================
|
||||
collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data));
|
||||
collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_mv(K = ?key_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_mv(K = ?key_succeeded, Data) -> counter_metrics(?MG(K, Data)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%========================================
|
||||
%% Schema Validation Metrics
|
||||
%%========================================
|
||||
|
||||
message_transformation_metrics_meta() ->
|
||||
[
|
||||
{?key_enabled, gauge},
|
||||
{?key_matched, counter},
|
||||
{?key_failed, counter},
|
||||
{?key_succeeded, counter}
|
||||
].
|
||||
|
||||
message_transformation_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(message_transformation_metrics_meta()).
|
||||
|
||||
to_validation_data(Mode, Validations) ->
|
||||
lists:foldl(
|
||||
fun(#{name := Name} = Validation, Acc) ->
|
||||
merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc)
|
||||
end,
|
||||
maps:from_keys(message_transformation_metric(names), []),
|
||||
Validations
|
||||
).
|
||||
|
||||
merge_acc_with_validations(Mode, Id, ValidationMetrics, PointsAcc) ->
|
||||
maps:fold(
|
||||
fun(K, V, AccIn) ->
|
||||
AccIn#{K => [validation_point(Mode, Id, V) | ?MG(K, AccIn)]}
|
||||
end,
|
||||
PointsAcc,
|
||||
ValidationMetrics
|
||||
).
|
||||
|
||||
validation_point(Mode, Name, V) ->
|
||||
{with_node_label(Mode, [{validation_name, Name}]), V}.
|
||||
|
||||
get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
|
||||
#{counters := Counters} = emqx_message_transformation_registry:get_metrics(Name),
|
||||
#{
|
||||
?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled),
|
||||
?key_matched => ?MG0('matched', Counters),
|
||||
?key_failed => ?MG0('failed', Counters),
|
||||
?key_succeeded => ?MG0('succeeded', Counters)
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collect functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% merge / zip formatting funcs for type `application/json`
|
||||
|
||||
collect_json_data(Data) ->
|
||||
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_message_transformation_metrics/3).
|
||||
|
||||
zip_json_message_transformation_metrics(Key, Points, [] = _AccIn) ->
|
||||
lists:foldl(
|
||||
fun({Labels, Metric}, AccIn2) ->
|
||||
LabelsKVMap = maps:from_list(Labels),
|
||||
Point = LabelsKVMap#{Key => Metric},
|
||||
[Point | AccIn2]
|
||||
end,
|
||||
[],
|
||||
Points
|
||||
);
|
||||
zip_json_message_transformation_metrics(Key, Points, AllResultsAcc) ->
|
||||
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
|
||||
lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% Helper funcs
|
||||
|
||||
with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
|
||||
Labels;
|
||||
with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
|
||||
Labels;
|
||||
with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
|
||||
[{node, node()} | Labels].
|
||||
|
||||
%% END if(?EMQX_RELEASE_EDITION == ee).
|
||||
-endif.
|
|
@ -82,7 +82,8 @@ all() ->
|
|||
{group, '/prometheus/stats'},
|
||||
{group, '/prometheus/auth'},
|
||||
{group, '/prometheus/data_integration'},
|
||||
[{group, '/prometheus/schema_validation'} || emqx_release:edition() == ee]
|
||||
[{group, '/prometheus/schema_validation'} || emqx_release:edition() == ee],
|
||||
[{group, '/prometheus/message_transformation'} || emqx_release:edition() == ee]
|
||||
]).
|
||||
|
||||
groups() ->
|
||||
|
@ -101,6 +102,7 @@ groups() ->
|
|||
{'/prometheus/auth', ModeGroups},
|
||||
{'/prometheus/data_integration', ModeGroups},
|
||||
{'/prometheus/schema_validation', ModeGroups},
|
||||
{'/prometheus/message_transformation', ModeGroups},
|
||||
{?PROM_DATA_MODE__NODE, AcceptGroups},
|
||||
{?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups},
|
||||
{?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups},
|
||||
|
@ -136,6 +138,10 @@ init_per_suite(Config) ->
|
|||
{emqx_schema_validation, #{config => schema_validation_config()}}
|
||||
|| emqx_release:edition() == ee
|
||||
],
|
||||
[
|
||||
{emqx_message_transformation, #{config => message_transformation_config()}}
|
||||
|| emqx_release:edition() == ee
|
||||
],
|
||||
{emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()}
|
||||
]),
|
||||
#{
|
||||
|
@ -168,6 +174,8 @@ init_per_group('/prometheus/data_integration', Config) ->
|
|||
[{module, emqx_prometheus_data_integration} | Config];
|
||||
init_per_group('/prometheus/schema_validation', Config) ->
|
||||
[{module, emqx_prometheus_schema_validation} | Config];
|
||||
init_per_group('/prometheus/message_transformation', Config) ->
|
||||
[{module, emqx_prometheus_message_transformation} | Config];
|
||||
init_per_group(?PROM_DATA_MODE__NODE, Config) ->
|
||||
[{mode, ?PROM_DATA_MODE__NODE} | Config];
|
||||
init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) ->
|
||||
|
@ -357,6 +365,8 @@ metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
|||
metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
%% `/prometheus/schema_validation`
|
||||
metric_meta(<<"emqx_schema_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
%% `/prometheus/message_transformation`
|
||||
metric_meta(<<"emqx_message_transformation_", _Tail/binary>>) -> ?meta(1, 1, 2);
|
||||
%% normal emqx metrics
|
||||
metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1);
|
||||
metric_meta(_) -> #{}.
|
||||
|
@ -840,6 +850,23 @@ assert_json_data__schema_validations(Ms, _) ->
|
|||
Ms
|
||||
).
|
||||
|
||||
assert_json_data__message_transformations(Ms, _) ->
|
||||
lists:foreach(
|
||||
fun(M) ->
|
||||
?assertMatch(
|
||||
#{
|
||||
validation_name := _,
|
||||
emqx_message_transformation_enable := _,
|
||||
emqx_message_transformation_matched := _,
|
||||
emqx_message_transformation_failed := _,
|
||||
emqx_message_transformation_succeeded := _
|
||||
},
|
||||
M
|
||||
)
|
||||
end,
|
||||
Ms
|
||||
).
|
||||
|
||||
schema_validation_config() ->
|
||||
Validation = #{
|
||||
<<"enable">> => true,
|
||||
|
@ -860,5 +887,24 @@ schema_validation_config() ->
|
|||
}
|
||||
}.
|
||||
|
||||
message_transformation_config() ->
|
||||
Transformation = #{
|
||||
<<"enable">> => true,
|
||||
<<"name">> => <<"my_transformation">>,
|
||||
<<"topics">> => [<<"t/#">>],
|
||||
<<"failure_action">> => <<"drop">>,
|
||||
<<"operations">> => [
|
||||
#{
|
||||
<<"key">> => <<"topic">>,
|
||||
<<"value">> => <<"concat([topic, '/', payload.t])">>
|
||||
}
|
||||
]
|
||||
},
|
||||
#{
|
||||
<<"message_transformation">> => #{
|
||||
<<"transformations">> => [Transformation]
|
||||
}
|
||||
}.
|
||||
|
||||
stop_apps(Apps) ->
|
||||
lists:foreach(fun application:stop/1, Apps).
|
||||
|
|
|
@ -45,6 +45,7 @@
|
|||
on_session_unsubscribed/4,
|
||||
on_message_publish/2,
|
||||
on_message_dropped/4,
|
||||
on_message_transformation_failed/3,
|
||||
on_schema_validation_failed/3,
|
||||
on_message_delivered/3,
|
||||
on_message_acked/3,
|
||||
|
@ -80,6 +81,7 @@ event_names() ->
|
|||
'message.delivered',
|
||||
'message.acked',
|
||||
'message.dropped',
|
||||
'message.transformation_failed',
|
||||
'schema.validation_failed',
|
||||
'delivery.dropped'
|
||||
].
|
||||
|
@ -96,6 +98,7 @@ event_topics_enum() ->
|
|||
'$events/message_delivered',
|
||||
'$events/message_acked',
|
||||
'$events/message_dropped',
|
||||
'$events/message_transformation_failed',
|
||||
'$events/schema_validation_failed',
|
||||
'$events/delivery_dropped'
|
||||
% '$events/message_publish' % not possible to use in SELECT FROM
|
||||
|
@ -237,6 +240,19 @@ on_message_dropped(Message, _, Reason, Conf) ->
|
|||
end,
|
||||
{ok, Message}.
|
||||
|
||||
on_message_transformation_failed(Message, TransformationContext, Conf) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
apply_event(
|
||||
'message.transformation_failed',
|
||||
fun() -> eventmsg_transformation_failed(Message, TransformationContext) end,
|
||||
Conf
|
||||
)
|
||||
end,
|
||||
{ok, Message}.
|
||||
|
||||
on_schema_validation_failed(Message, ValidationContext, Conf) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
|
@ -535,6 +551,38 @@ eventmsg_dropped(
|
|||
#{headers => Headers}
|
||||
).
|
||||
|
||||
eventmsg_transformation_failed(
|
||||
Message = #message{
|
||||
id = Id,
|
||||
from = ClientId,
|
||||
qos = QoS,
|
||||
flags = Flags,
|
||||
topic = Topic,
|
||||
headers = Headers,
|
||||
payload = Payload,
|
||||
timestamp = Timestamp
|
||||
},
|
||||
TransformationContext
|
||||
) ->
|
||||
#{name := TransformationName} = TransformationContext,
|
||||
with_basic_columns(
|
||||
'message.transformation_failed',
|
||||
#{
|
||||
id => emqx_guid:to_hexstr(Id),
|
||||
transformation => TransformationName,
|
||||
clientid => ClientId,
|
||||
username => emqx_message:get_header(username, Message, undefined),
|
||||
payload => Payload,
|
||||
peername => ntoa(emqx_message:get_header(peername, Message, undefined)),
|
||||
topic => Topic,
|
||||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
||||
eventmsg_validation_failed(
|
||||
Message = #message{
|
||||
id = Id,
|
||||
|
@ -737,9 +785,17 @@ event_info_schema_validation_failed() ->
|
|||
{<<"messages that do not pass configured validations">>, <<"未通过验证的消息"/utf8>>},
|
||||
<<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">>
|
||||
).
|
||||
event_info_message_transformation_failed() ->
|
||||
event_info_common(
|
||||
'message.transformation_failed',
|
||||
{<<"message transformation failed">>, <<"message 验证失败"/utf8>>},
|
||||
{<<"messages that do not pass configured transformation">>, <<"未通过验证的消息"/utf8>>},
|
||||
<<"SELECT * FROM \"$events/message_transformation_failed\" WHERE topic =~ 't/#'">>
|
||||
).
|
||||
ee_event_info() ->
|
||||
[
|
||||
event_info_schema_validation_failed()
|
||||
event_info_schema_validation_failed(),
|
||||
event_info_message_transformation_failed()
|
||||
].
|
||||
-else.
|
||||
%% END (?EMQX_RELEASE_EDITION == ee).
|
||||
|
@ -933,6 +989,9 @@ test_columns(Event) ->
|
|||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
ee_test_columns('schema.validation_failed') ->
|
||||
[{<<"validation">>, <<"myvalidation">>}] ++
|
||||
test_columns('message.publish');
|
||||
ee_test_columns('message.transformation_failed') ->
|
||||
[{<<"transformation">>, <<"mytransformation">>}] ++
|
||||
test_columns('message.publish').
|
||||
%% ELSE (?EMQX_RELEASE_EDITION == ee).
|
||||
-else.
|
||||
|
@ -997,6 +1056,23 @@ columns_with_exam('schema.validation_failed') ->
|
|||
{<<"timestamp">>, erlang:system_time(millisecond)},
|
||||
{<<"node">>, node()}
|
||||
];
|
||||
columns_with_exam('message.transformation_failed') ->
|
||||
[
|
||||
{<<"event">>, 'message.transformation_failed'},
|
||||
{<<"validation">>, <<"my_transformation">>},
|
||||
{<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())},
|
||||
{<<"clientid">>, <<"c_emqx">>},
|
||||
{<<"username">>, <<"u_emqx">>},
|
||||
{<<"payload">>, <<"{\"msg\": \"hello\"}">>},
|
||||
{<<"peername">>, <<"192.168.0.10:56431">>},
|
||||
{<<"topic">>, <<"t/a">>},
|
||||
{<<"qos">>, 1},
|
||||
{<<"flags">>, #{}},
|
||||
{<<"publish_received_at">>, erlang:system_time(millisecond)},
|
||||
columns_example_props(pub_props),
|
||||
{<<"timestamp">>, erlang:system_time(millisecond)},
|
||||
{<<"node">>, node()}
|
||||
];
|
||||
columns_with_exam('delivery.dropped') ->
|
||||
[
|
||||
{<<"event">>, 'delivery.dropped'},
|
||||
|
@ -1200,6 +1276,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
|
|||
hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
|
||||
hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
|
||||
hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
|
||||
hook_fun('message.transformation_failed') -> fun ?MODULE:on_message_transformation_failed/3;
|
||||
hook_fun('schema.validation_failed') -> fun ?MODULE:on_schema_validation_failed/3;
|
||||
hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
|
||||
hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
|
||||
|
@ -1231,6 +1308,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
|
|||
event_name(<<"$events/message_delivered">>) -> 'message.delivered';
|
||||
event_name(<<"$events/message_acked">>) -> 'message.acked';
|
||||
event_name(<<"$events/message_dropped">>) -> 'message.dropped';
|
||||
event_name(<<"$events/message_transformation_failed">>) -> 'message.transformation_failed';
|
||||
event_name(<<"$events/schema_validation_failed">>) -> 'schema.validation_failed';
|
||||
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
|
||||
event_name(_) -> 'message.publish'.
|
||||
|
@ -1246,6 +1324,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
|
|||
event_topic('message.delivered') -> <<"$events/message_delivered">>;
|
||||
event_topic('message.acked') -> <<"$events/message_acked">>;
|
||||
event_topic('message.dropped') -> <<"$events/message_dropped">>;
|
||||
event_topic('message.transformation_failed') -> <<"$events/message_transformation_failed">>;
|
||||
event_topic('schema.validation_failed') -> <<"$events/schema_validation_failed">>;
|
||||
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
|
||||
event_topic('message.publish') -> <<"$events/message_publish">>.
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
inc_action_metrics/2
|
||||
]).
|
||||
|
||||
%% Internal exports used by schema validation
|
||||
%% Internal exports used by schema validation and message transformation.
|
||||
-export([evaluate_select/3, clear_rule_payload/0]).
|
||||
|
||||
-import(
|
||||
|
|
|
@ -2,7 +2,7 @@ Business Source License 1.1
|
|||
|
||||
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||
Licensed Work: EMQX Enterprise Edition
|
||||
The Licensed Work is (c) 2023
|
||||
The Licensed Work is (c) 2024
|
||||
Hangzhou EMQ Technologies Co., Ltd.
|
||||
Additional Use Grant: Students and educators are granted right to copy,
|
||||
modify, and create derivative work for research
|
||||
|
|
|
@ -6,7 +6,7 @@ the message without further processing, or to disconnect the offending client as
|
|||
|
||||
# Documentation
|
||||
|
||||
Refer to [Message
|
||||
Refer to [Schema
|
||||
Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/schema-validation.html)
|
||||
for more information about the semantics and checks available.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_schema_validation, [
|
||||
{description, "EMQX Schema Validation"},
|
||||
{vsn, "0.1.0"},
|
||||
{vsn, "0.1.1"},
|
||||
{registered, [emqx_schema_validation_sup, emqx_schema_validation_registry]},
|
||||
{mod, {emqx_schema_validation_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -144,7 +144,7 @@ delete(Name) ->
|
|||
|
||||
-spec register_hooks() -> ok.
|
||||
register_hooks() ->
|
||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_MSG_VALIDATION).
|
||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_SCHEMA_VALIDATION).
|
||||
|
||||
-spec unregister_hooks() -> ok.
|
||||
unregister_hooks() ->
|
||||
|
|
|
@ -69,7 +69,7 @@ render(Expression, Bindings) ->
|
|||
render(Expression, Bindings, #{}).
|
||||
|
||||
render(#{form := Form}, Bindings, Opts) ->
|
||||
eval_as_string(Form, Bindings, Opts);
|
||||
eval_render(Form, Bindings, Opts);
|
||||
render(Expression, Bindings, Opts) ->
|
||||
case compile(Expression) of
|
||||
{ok, Compiled} ->
|
||||
|
@ -78,9 +78,16 @@ render(Expression, Bindings, Opts) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
eval_as_string(Expr, Bindings, _Opts) ->
|
||||
eval_render(Expr, Bindings, Opts) ->
|
||||
EvalAsStr = maps:get(eval_as_string, Opts, true),
|
||||
try
|
||||
{ok, return_str(eval(Expr, Bindings, #{}))}
|
||||
Result = eval(Expr, Bindings, #{}),
|
||||
case EvalAsStr of
|
||||
true ->
|
||||
{ok, return_str(Result)};
|
||||
false ->
|
||||
{ok, Result}
|
||||
end
|
||||
catch
|
||||
throw:Reason ->
|
||||
{error, Reason};
|
||||
|
@ -88,7 +95,7 @@ eval_as_string(Expr, Bindings, _Opts) ->
|
|||
{error, #{exception => C, reason => E, stack_trace => S}}
|
||||
end.
|
||||
|
||||
%% Force the expression to return binary string.
|
||||
%% Force the expression to return binary string (in most cases).
|
||||
return_str(Str) when is_binary(Str) -> Str;
|
||||
return_str(Num) when is_integer(Num) -> integer_to_binary(Num);
|
||||
return_str(Num) when is_float(Num) -> float_to_binary(Num, [{decimals, 10}, compact]);
|
||||
|
@ -313,7 +320,7 @@ assert_module_allowed(Mod) ->
|
|||
ok;
|
||||
false ->
|
||||
throw(#{
|
||||
reason => unallowed_veriform_module,
|
||||
reason => unallowed_variform_module,
|
||||
module => Mod
|
||||
})
|
||||
end.
|
||||
|
|
|
@ -22,6 +22,7 @@ Rootsymbol
|
|||
expr -> call_or_var : '$1'.
|
||||
|
||||
%% Function call or variable
|
||||
call_or_var -> identifier '(' ')' : {call, element(3, '$1'), []}.
|
||||
call_or_var -> identifier '(' args ')' : {call, element(3, '$1'), '$3'}.
|
||||
call_or_var -> identifier : {var, element(3, '$1')}.
|
||||
|
||||
|
|
|
@ -126,7 +126,7 @@ inject_allowed_module_test() ->
|
|||
render(atom_to_list(?MODULE) ++ ".concat('a','b')", #{})
|
||||
),
|
||||
?assertMatch(
|
||||
{error, #{reason := unallowed_veriform_module, module := emqx}},
|
||||
{error, #{reason := unallowed_variform_module, module := emqx}},
|
||||
render("emqx.concat('a','b')", #{})
|
||||
)
|
||||
after
|
||||
|
@ -231,8 +231,12 @@ syntax_error_test_() ->
|
|||
{"const string single quote", fun() -> ?assertMatch(?SYNTAX_ERROR, render("'a'", #{})) end},
|
||||
{"const string double quote", fun() ->
|
||||
?assertMatch(?SYNTAX_ERROR, render(<<"\"a\"">>, #{}))
|
||||
end},
|
||||
{"no arity", fun() -> ?assertMatch(?SYNTAX_ERROR, render("concat()", #{})) end}
|
||||
end}
|
||||
].
|
||||
|
||||
maps_test_() ->
|
||||
[
|
||||
{"arity zero", ?_assertEqual({ok, <<"0">>}, render(<<"maps.size(maps.new())">>, #{}))}
|
||||
].
|
||||
|
||||
render(Expression, Bindings) ->
|
||||
|
|
1
mix.exs
1
mix.exs
|
@ -190,6 +190,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
:emqx_bridge_s3,
|
||||
:emqx_schema_registry,
|
||||
:emqx_schema_validation,
|
||||
:emqx_message_transformation,
|
||||
:emqx_enterprise,
|
||||
:emqx_bridge_kinesis,
|
||||
:emqx_bridge_azure_event_hub,
|
||||
|
|
|
@ -117,6 +117,7 @@ is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
|
|||
is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
|
||||
is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
|
||||
is_community_umbrella_app("apps/emqx_schema_validation") -> false;
|
||||
is_community_umbrella_app("apps/emqx_message_transformation") -> false;
|
||||
is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
|
||||
is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
|
||||
is_community_umbrella_app(_) -> true.
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
emqx_message_transformation_http_api {
|
||||
|
||||
list_transformations.desc:
|
||||
"""List transformations"""
|
||||
|
||||
lookup_transformation.desc:
|
||||
"""Lookup a transformation"""
|
||||
|
||||
update_transformation.desc:
|
||||
"""Update a transformation"""
|
||||
|
||||
delete_transformation.desc:
|
||||
"""Delete a transformation"""
|
||||
|
||||
append_transformation.desc:
|
||||
"""Append a new transformation to the list of transformations"""
|
||||
|
||||
reorder_transformations.desc:
|
||||
"""Reorder of all transformations"""
|
||||
|
||||
enable_disable_transformation.desc:
|
||||
"""Enable or disable a particular transformation"""
|
||||
|
||||
get_transformation_metrics.desc:
|
||||
"""Get metrics for a particular transformation"""
|
||||
|
||||
reset_transformation_metrics.desc:
|
||||
"""Reset metrics for a particular transformation"""
|
||||
|
||||
param_path_name.desc:
|
||||
"""Transformation name"""
|
||||
|
||||
param_path_enable.desc:
|
||||
"""Enable or disable transformation"""
|
||||
|
||||
}
|
|
@ -30,4 +30,9 @@ get_prom_schema_validation.desc:
|
|||
get_prom_schema_validation.label:
|
||||
"""Prometheus Metrics for Schema Validation"""
|
||||
|
||||
get_prom_message_transformation.desc:
|
||||
"""Get Prometheus Metrics for Message Validation"""
|
||||
get_prom_message_transformation.label:
|
||||
"""Prometheus Metrics for Message Validation"""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue