diff --git a/Makefile b/Makefile index eb745302d..cb14fdf99 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-8:1.13.3-24.2.1-1-al export EMQX_DEFAULT_RUNNER = alpine:3.14 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) -export EMQX_DASHBOARD_VERSION ?= v0.21.0 +export EMQX_DASHBOARD_VERSION ?= v0.23.0 export DOCKERFILE := deploy/docker/Dockerfile export EMQX_REL_FORM ?= tgz ifeq ($(OS),Windows_NT) diff --git a/apps/emqx/include/http_api.hrl b/apps/emqx/include/http_api.hrl index cb0c49df0..4592a2a8f 100644 --- a/apps/emqx/include/http_api.hrl +++ b/apps/emqx/include/http_api.hrl @@ -16,8 +16,9 @@ %% Bad Request -define(BAD_REQUEST, 'BAD_REQUEST'). +-define(NOT_MATCH, 'NOT_MATCH'). --define(ALREADY_EXISTS, 'ALREADY_EXISTS'). +-define(ALREADY_EXISTS, 'ALREADY_EXISTS'). -define(BAD_CONFIG_SCHEMA, 'BAD_CONFIG_SCHEMA'). -define(BAD_LISTENER_ID, 'BAD_LISTENER_ID'). -define(BAD_NODE_NAME, 'BAD_NODE_NAME'). @@ -49,7 +50,8 @@ %% All codes -define(ERROR_CODES, [ {'BAD_REQUEST', <<"Request parameters are not legal">>} - , {'ALREADY_EXISTS', <<"Resource already existed">>} + , {'NOT_MATCH', <<"Conditions are not matched">>} + , {'ALREADY_EXISTS', <<"Resource already existed">>} , {'BAD_CONFIG_SCHEMA', <<"Configuration data is not legal">>} , {'BAD_LISTENER_ID', <<"Bad listener ID">>} , {'BAD_NODE_NAME', <<"Bad Node Name">>} diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 591bfa8c3..fcad0ef7a 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -30,7 +30,7 @@ %% callbacks for emqtt -export([ handle_puback/2 - , handle_publish/2 + , handle_publish/3 , handle_disconnected/2 ]). @@ -52,7 +52,7 @@ start(Config) -> Mountpoint = maps:get(receive_mountpoint, Config, undefined), Subscriptions = maps:get(subscriptions, Config, undefined), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), - Handlers = make_hdlr(Parent, Vars), + Handlers = make_hdlr(Parent, Vars, #{server => ip_port_to_server(Host, Port)}), Config1 = Config#{ msg_handler => Handlers, host => Host, @@ -161,12 +161,12 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> ?SLOG(warning, #{msg => "publish_to_remote_node_falied", packet_id => PktId, reason_code => RC}). -handle_publish(Msg, undefined) -> +handle_publish(Msg, undefined, _Opts) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(#{properties := Props} = Msg0, Vars) -> - Msg = format_msg_received(Msg0), +handle_publish(#{properties := Props} = Msg0, Vars, Opts) -> + Msg = format_msg_received(Msg0, Opts), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), case Vars of @@ -179,9 +179,9 @@ handle_publish(#{properties := Props} = Msg0, Vars) -> handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. -make_hdlr(Parent, Vars) -> +make_hdlr(Parent, Vars, Opts) -> #{puback => {fun ?MODULE:handle_puback/2, [Parent]}, - publish => {fun ?MODULE:handle_publish/2, [Vars]}, + publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} }. @@ -212,8 +212,9 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi end. format_msg_received(#{dup := Dup, payload := Payload, properties := Props, - qos := QoS, retain := Retain, topic := Topic}) -> + qos := QoS, retain := Retain, topic := Topic}, #{server := Server}) -> #{ id => emqx_guid:to_hexstr(emqx_guid:gen()) + , server => Server , payload => Payload , topic => Topic , qos => QoS @@ -236,3 +237,10 @@ printable_maps(Headers) -> }; (K, V0, AccIn) -> AccIn#{K => V0} end, #{}, Headers). + +ip_port_to_server(Host, Port) -> + HostStr = case inet:ntoa(Host) of + {error, einval} -> Host; + IPStr -> IPStr + end, + list_to_binary(io_lib:format("~s:~w", [HostStr, Port])). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 2c7cfe012..7fb657ee6 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -28,6 +28,8 @@ , egress_desc/0 ]). +-export([non_empty_string/1]). + -import(emqx_schema, [mk_duration/2]). roots() -> @@ -101,6 +103,7 @@ fields("ingress") -> [ {remote_topic, sc(binary(), #{ required => true + , validator => fun ?MODULE:non_empty_string/1 , desc => "Receive messages from which topic of the remote broker" })} , {remote_qos, @@ -110,7 +113,8 @@ fields("ingress") -> })} , {local_topic, sc(binary(), - #{ desc => """ + #{ validator => fun ?MODULE:non_empty_string/1 + , desc => """ Send messages to which topic of the local broker.
Template with variables is allowed. """ @@ -135,10 +139,12 @@ fields("egress") -> [ {local_topic, sc(binary(), #{ desc => "The local topic to be forwarded to the remote broker" + , validator => fun ?MODULE:non_empty_string/1 })} , {remote_topic, sc(binary(), #{ default => <<"${topic}">> + , validator => fun ?MODULE:non_empty_string/1 , desc => """ Forward to which topic of the remote broker.
Template with variables is allowed. @@ -233,5 +239,10 @@ Template with variables is allowed.""" qos() -> hoconsc:union([emqx_schema:qos(), binary()]). +non_empty_string(<<>>) -> {error, empty_string_not_allowed}; +non_empty_string("") -> {error, empty_string_not_allowed}; +non_empty_string(S) when is_binary(S); is_list(S) -> ok; +non_empty_string(_) -> {error, invalid_string}. + sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 7eefe8701..e437961b8 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -58,13 +58,13 @@ -export_type([metrics/0, handler_name/0, metric_id/0]). -type rate() :: #{ - current => float(), - max => float(), - last5m => float() + current := float(), + max := float(), + last5m := float() }. -type metrics() :: #{ - counters => #{atom() => integer()}, - rate => #{atom() => rate()} + counters := #{atom() => integer()}, + rate := #{atom() => rate()} }. -type handler_name() :: atom(). -type metric_id() :: binary(). @@ -158,10 +158,10 @@ init(Name) -> {ok, #state{}}. handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> - {reply, #{}, State}; + {reply, make_rate(0, 0, 0), State}; handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> {reply, case maps:get(Id, Rates, undefined) of - undefined -> #{}; + undefined -> make_rate(0, 0, 0); RatesPerId -> format_rates_of_id(RatesPerId) end, State}; @@ -303,7 +303,13 @@ format_rates_of_id(RatesPerId) -> end, RatesPerId). format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) -> - #{max => precision(Max, 2), current => precision(Current, 2), last5m => precision(Last5Min, 2)}. + make_rate(Current, Max, Last5Min). + +make_rate(Current, Max, Last5Min) -> + #{ current => precision(Current, 2) + , max => precision(Max, 2) + , last5m => precision(Last5Min, 2) + }. precision(Float, N) -> Base = math:pow(10, N), diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 66064ac24..d676767f2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -15,10 +15,11 @@ -spec check_params(map(), tag()) -> {ok, map()} | {error, term()}. check_params(Params, Tag) -> BTag = atom_to_binary(Tag), - case emqx_hocon:check(?MODULE, #{BTag => Params}) of - {ok, #{Tag := Checked}} -> - {ok, Checked}; - {error, Reason} -> + Opts = #{atom_key => true, required => false}, + try hocon_tconf:check_plain(?MODULE, #{BTag => Params}, Opts, [Tag]) of + #{Tag := Checked} -> {ok, Checked} + catch + throw : Reason -> ?SLOG(error, #{msg => "check_rule_params_failed", reason => Reason }), @@ -72,6 +73,7 @@ fields("rule_test") -> , ref("ctx_dropped") , ref("ctx_connected") , ref("ctx_disconnected") + , ref("ctx_bridge_mqtt") ]), #{desc => "The context of the event for testing", default => #{}})} @@ -204,7 +206,20 @@ fields("ctx_disconnected") -> , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})} , {"disconnected_at", sc(integer(), #{ desc => "The Time that this Client is Disconnected"})} - ]. + ]; + +fields("ctx_bridge_mqtt") -> + [ {"event_type", sc('$bridges/mqtt:*', #{desc => "Event Type", required => true})} + , {"id", sc(binary(), #{desc => "Message ID"})} + , {"payload", sc(binary(), #{desc => "The Message Payload"})} + , {"topic", sc(binary(), #{desc => "Message Topic"})} + , {"server", sc(binary(), #{desc => "The IP address (or hostname) and port of the MQTT broker," + " in IP:Port format"})} + , {"dup", sc(binary(), #{desc => "The DUP flag of the MQTT message"})} + , {"retain", sc(binary(), #{desc => "If is a retain message"})} + , {"message_received_at", sc(integer(), #{ + desc => "The Time that this Message is Received"})} + ] ++ [qos()]. qos() -> {"qos", sc(emqx_schema:qos(), #{desc => "The Message QoS"})}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index cdc2773b4..2e3a09172 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -41,7 +41,7 @@ {ok, CheckedParams} -> EXPR; {error, REASON} -> - {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}} + {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}} end). -define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), @@ -85,10 +85,8 @@ api_spec() -> paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id"]. -error_schema(Code, Message) -> - [ {code, mk(string(), #{example => Code})} - , {message, mk(string(), #{example => Message})} - ]. +error_schema(Code, Message) when is_atom(Code) -> + emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)). rule_creation_schema() -> ref(emqx_rule_api_schema, "rule_creation"). @@ -115,7 +113,7 @@ schema("/rules") -> summary => <<"Create a Rule">>, requestBody => rule_creation_schema(), responses => #{ - 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 201 => rule_info_schema() }} }; @@ -153,7 +151,7 @@ schema("/rules/:id") -> parameters => param_path_id(), requestBody => rule_creation_schema(), responses => #{ - 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 200 => rule_info_schema() } }, @@ -177,7 +175,7 @@ schema("/rule_test") -> summary => <<"Test a Rule">>, requestBody => rule_test_schema(), responses => #{ - 400 => error_schema('BAD_ARGS', "Invalid Parameters"), + 400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 412 => error_schema('NOT_MATCH', "SQL Not Match"), 200 => <<"Rule Test Pass">> } @@ -201,13 +199,13 @@ param_path_id() -> '/rules'(post, #{body := Params0}) -> case maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))) of <<>> -> - {400, #{code => 'BAD_ARGS', message => <<"empty rule id is not allowed">>}}; + {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}}; Id -> Params = filter_out_request_body(Params0), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> - {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}}; + {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}}; not_found -> case emqx_conf:update(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> @@ -216,7 +214,7 @@ param_path_id() -> {error, Reason} -> ?SLOG(error, #{msg => "create_rule_failed", id => Id, reason => Reason}), - {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}} end end end. @@ -224,6 +222,8 @@ param_path_id() -> '/rule_test'(post, #{body := Params}) -> ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of {ok, Result} -> {200, Result}; + {error, {parse_error, Reason}} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}} end). @@ -245,7 +245,7 @@ param_path_id() -> {error, Reason} -> ?SLOG(error, #{msg => "update_rule_failed", id => Id, reason => Reason}), - {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}} end; '/rules/:id'(delete, #{bindings := #{id := Id}}) -> @@ -255,7 +255,7 @@ param_path_id() -> {error, Reason} -> ?SLOG(error, #{msg => "delete_rule_failed", id => Id, reason => Reason}), - {500, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}} end. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 1b45ddb0f..10e3e41ef 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -607,7 +607,7 @@ columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) -> [ {<<"event">>, EventName} , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"server">>, <<"192.168.0.10:1883">>} , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} , {<<"dup">>, false} diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 4f036cf49..abd6a54c3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -22,21 +22,25 @@ , get_selected_data/3 ]). --spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, nomatch}. +-spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}. test(#{sql := Sql, context := Context}) -> - {ok, Select} = emqx_rule_sqlparser:parse(Sql), - InTopic = maps:get(topic, Context, <<>>), - EventTopics = emqx_rule_sqlparser:select_from(Select), - case lists:all(fun is_publish_topic/1, EventTopics) of - true -> - %% test if the topic matches the topic filters in the rule - case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of - true -> test_rule(Sql, Select, Context, EventTopics); - false -> {error, nomatch} + case emqx_rule_sqlparser:parse(Sql) of + {ok, Select} -> + InTopic = maps:get(topic, Context, <<>>), + EventTopics = emqx_rule_sqlparser:select_from(Select), + case lists:all(fun is_publish_topic/1, EventTopics) of + true -> + %% test if the topic matches the topic filters in the rule + case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of + true -> test_rule(Sql, Select, Context, EventTopics); + false -> {error, nomatch} + end; + false -> + %% the rule is for both publish and events, test it directly + test_rule(Sql, Select, Context, EventTopics) end; - false -> - %% the rule is for both publish and events, test it directly - test_rule(Sql, Select, Context, EventTopics) + {error, Reason} -> + {error, Reason} end. test_rule(Sql, Select, Context, EventTopics) ->