diff --git a/Makefile b/Makefile index ad8b97da1..49547a03b 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-2:23.3.4.9-3-alpine3 export EMQX_DEFAULT_RUNNER = alpine:3.14 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) -export EMQX_DASHBOARD_VERSION ?= v0.8.0 +export EMQX_DASHBOARD_VERSION ?= v0.10.0 export DOCKERFILE := deploy/docker/Dockerfile export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing ifeq ($(OS),Windows_NT) diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 0d7f66323..42e4d0baf 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -194,6 +194,7 @@ format(Traces) -> end, Traces). init([]) -> + ok = mria:wait_for_tables([?TRACE]), erlang:process_flag(trap_exit, true), OriginLogLevel = emqx_logger:get_primary_log_level(), ok = filelib:ensure_dir(trace_dir()), diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index dc9c894b6..f8937dee9 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -22,7 +22,7 @@ -export([get_collect/0]). --export([get_local_time/0]). +-export([get_universal_epoch/0]). -boot_mnesia({mnesia, [boot]}). @@ -108,7 +108,7 @@ handle_info(collect, State = #{count := Count, collect := Collect, temp_collect handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) -> timer(?CLEAR_INTERVAL, clear_expire_data), - T1 = get_local_time(), + T1 = get_universal_epoch(), Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end), Collects = ets:select(?TAB_COLLECT, Spec), lists:foreach(fun(Collect) -> @@ -161,7 +161,7 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> diff(Received, Received0), diff(Sent, Sent0), diff(Dropped, Dropped0)}, - Ts = get_local_time(), + Ts = get_universal_epoch(), {atomic, ok} = mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [ ?TAB_COLLECT @@ -179,8 +179,8 @@ timer(Secs, Msg) -> erlang:send_after(Secs, self(), Msg). get_today_remaining_seconds() -> - ?CLEAR_INTERVAL - (get_local_time() rem ?CLEAR_INTERVAL). + ?CLEAR_INTERVAL - (get_universal_epoch() rem ?CLEAR_INTERVAL). -get_local_time() -> - (calendar:datetime_to_gregorian_seconds(calendar:local_time()) - +get_universal_epoch() -> + (calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index a05746811..5ada429a3 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -278,7 +278,7 @@ sampling(Node, Counter) -> rpc:call(Node, ?MODULE, sampling, [Node, Counter]). select_data() -> - Time = emqx_dashboard_collection:get_local_time() - 7200000, + Time = emqx_dashboard_collection:get_universal_epoch() - 7200000, ets:select(?TAB_COLLECT, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]). format(Collects) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 2dcdba643..9a54be9c5 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -211,11 +211,16 @@ check_request_body(#{body := Body}, Schema, Module, CheckFun, true) -> %% {good_nest_2, mk(ref(?MODULE, good_ref), #{})} %% ]} %% ] -check_request_body(#{body := Body}, Spec, _Module, CheckFun, false) -> +check_request_body(#{body := Body}, Spec, _Module, CheckFun, false)when is_list(Spec) -> lists:foldl(fun({Name, Type}, Acc) -> Schema = ?INIT_SCHEMA#{roots => [{Name, Type}]}, maps:merge(Acc, CheckFun(Schema, Body, #{})) - end, #{}, Spec). + end, #{}, Spec); + +%% requestBody => #{content => #{ 'application/octet-stream' => +%% #{schema => #{ type => string, format => binary}}} +check_request_body(#{body := Body}, Spec, _Module, _CheckFun, false)when is_map(Spec) -> + Body. %% tags, description, summary, security, deprecated meta_to_spec(Meta, Module) -> @@ -287,6 +292,7 @@ trans_desc(Spec, Hocon) -> Desc -> Spec#{description => to_bin(Desc)} end. +request_body(#{content := _} = Content, _Module) -> {Content, []}; request_body([], _Module) -> {[], []}; request_body(Schema, Module) -> {{Props, Refs}, Examples} = diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index ab079b587..60f3d4837 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -460,8 +460,9 @@ process_connect(#channel{ctx = Ctx, {ok, _Sess} -> RandVal = rand:uniform(?TOKEN_MAXIMUM), Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)), + NResult = Result#{events => [{event, connected}]}, iter(Iter, - reply({ok, created}, Token, Msg, Result), + reply({ok, created}, Token, Msg, NResult), Channel#channel{token = Token}); {error, Reason} -> ?SLOG(error, #{ msg => "failed_open_session" @@ -568,7 +569,8 @@ process_out(Outs, Result, Channel, _) -> Reply -> [Reply | Outs2] end, - {ok, {outgoing, Outs3}, Channel}. + Events = maps:get(events, Result, []), + {ok, [{outgoing, Outs3}] ++ Events, Channel}. %% leaf node process_nothing(_, _, Channel) -> @@ -607,4 +609,6 @@ process_reply(Reply, Result, #channel{session = Session} = Channel, _) -> Session2 = emqx_coap_session:set_reply(Reply, Session), Outs = maps:get(out, Result, []), Outs2 = lists:reverse(Outs), - {ok, {outgoing, [Reply | Outs2]}, Channel#channel{session = Session2}}. + Events = maps:get(events, Result, []), + {ok, [{outgoing, [Reply | Outs2]}] ++ Events, + Channel#channel{session = Session2}}. diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 3ee209f19..3a133e340 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -83,7 +83,7 @@ gateway(post, Request) -> {ok, NGwConf} -> {201, NGwConf}; {error, Reason} -> - return_http_error(500, Reason) + emqx_gateway_http:reason2resp(Reason) end end catch diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 69a06be61..697bccc1d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -745,7 +745,8 @@ common_client_props() -> "due to exceeding the length">>})} , {awaiting_rel_cnt, mk(integer(), - #{ desc => <<"Number of awaiting PUBREC packet">>})} + %% FIXME: PUBREC ?? + #{ desc => <<"Number of awaiting acknowledge packet">>})} , {awaiting_rel_max, mk(integer(), #{ desc => <<"Maximum allowed number of awaiting PUBREC " @@ -755,25 +756,25 @@ common_client_props() -> #{ desc => <<"Number of bytes received by EMQ X Broker">>})} , {recv_cnt, mk(integer(), - #{ desc => <<"Number of TCP packets received">>})} + #{ desc => <<"Number of socket packets received">>})} , {recv_pkt, mk(integer(), - #{ desc => <<"Number of MQTT packets received">>})} + #{ desc => <<"Number of protocol packets received">>})} , {recv_msg, mk(integer(), - #{ desc => <<"Number of PUBLISH packets received">>})} + #{ desc => <<"Number of message packets received">>})} , {send_oct, mk(integer(), #{ desc => <<"Number of bytes sent">>})} , {send_cnt, mk(integer(), - #{ desc => <<"Number of TCP packets sent">>})} + #{ desc => <<"Number of socket packets sent">>})} , {send_pkt, mk(integer(), - #{ desc => <<"Number of MQTT packets sent">>})} + #{ desc => <<"Number of protocol packets sent">>})} , {send_msg, mk(integer(), - #{ desc => <<"Number of PUBLISH packets sent">>})} + #{ desc => <<"Number of message packets sent">>})} , {mailbox_len, mk(integer(), #{ desc => <<"Process mailbox size">>})} diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index fbf923700..ad381ce44 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -32,7 +32,7 @@ -import(emqx_gateway_api_authn, [schema_authn/0]). -%% minirest/dashbaord_swagger behaviour callbacks +%% minirest/dashboard_swagger behaviour callbacks -export([ api_spec/0 , paths/0 , schema/1 diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 987c9720b..cd1f64871 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -248,7 +248,8 @@ update(Req) -> res(emqx_conf:update([gateway], Req, #{override_to => cluster})). res({ok, Result}) -> {ok, Result}; -res({error, {pre_config_update,emqx_gateway_conf,Reason}}) -> {error, Reason}; +res({error, {pre_config_update,?MODULE,Reason}}) -> {error, Reason}; +res({error, {post_config_update,?MODULE,Reason}}) -> {error, Reason}; res({error, Reason}) -> {error, Reason}. bin({LType, LName}) -> @@ -314,12 +315,12 @@ pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) -> NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf), {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}; _ -> - {error, already_exist} + badres_gateway(already_exist, GwName) end; pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) -> case maps:get(GwName, RawConf, undefined) of undefined -> - {error, not_found}; + badres_gateway(not_found, GwName); _ -> NConf = maps:without([<<"listeners">>, ?AUTHN_BIN], Conf), {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})} @@ -341,13 +342,13 @@ pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) -> RawConf, #{GwName => #{<<"listeners">> => NListener}})}; _ -> - {error, already_exist} + badres_listener(already_exist, GwName, LType, LName) end; pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) -> case emqx_map_lib:deep_get( [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of undefined -> - {error, not_found}; + badres_listener(not_found, GwName, LType, LName); OldConf -> NConf = convert_certs(certs_dir(GwName), Conf, OldConf), NListener = #{LType => #{LName => NConf}}, @@ -374,14 +375,14 @@ pre_config_update(_, {add_authn, GwName, Conf}, RawConf) -> RawConf, #{GwName => #{?AUTHN_BIN => Conf}})}; _ -> - {error, already_exist} + badres_authn(already_exist, GwName) end; pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) -> case emqx_map_lib:deep_get( [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of undefined -> - {error, not_found}; + badres_listener(not_found, GwName, LType, LName); Listener -> case maps:get(?AUTHN_BIN, Listener, undefined) of undefined -> @@ -391,14 +392,14 @@ pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) -> #{LType => #{LName => NListener}}}}, {ok, emqx_map_lib:deep_merge(RawConf, NGateway)}; _ -> - {error, already_exist} + badres_listener_authn(already_exist, GwName, LType, LName) end end; pre_config_update(_, {update_authn, GwName, Conf}, RawConf) -> case emqx_map_lib:deep_get( [GwName, ?AUTHN_BIN], RawConf, undefined) of undefined -> - {error, not_found}; + badres_authn(not_found, GwName); _ -> {ok, emqx_map_lib:deep_merge( RawConf, @@ -409,11 +410,11 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) -> [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of undefined -> - {error, not_found}; + badres_listener(not_found, GwName, LType, LName); Listener -> case maps:get(?AUTHN_BIN, Listener, undefined) of undefined -> - {error, not_found}; + badres_listener_authn(not_found, GwName, LType, LName); Auth -> NListener = maps:put( ?AUTHN_BIN, @@ -437,6 +438,38 @@ pre_config_update(_, UnknownReq, _RawConf) -> logger:error("Unknown configuration update request: ~0p", [UnknownReq]), {error, badreq}. +badres_gateway(not_found, GwName) -> + {error, {badres, #{resource => gateway, gateway => GwName, + reason => not_found}}}; +badres_gateway(already_exist, GwName) -> + {error, {badres, #{resource => gateway, gateway => GwName, + reason => already_exist}}}. + +badres_listener(not_found, GwName, LType, LName) -> + {error, {badres, #{resource => listener, gateway => GwName, + listener => {GwName, LType, LName}, + reason => not_found}}}; +badres_listener(already_exist, GwName, LType, LName) -> + {error, {badres, #{resource => listener, gateway => GwName, + listener => {GwName, LType, LName}, + reason => already_exist}}}. + +badres_authn(not_found, GwName) -> + {error, {badres, #{resource => authn, gateway => GwName, + reason => not_found}}}; +badres_authn(already_exist, GwName) -> + {error, {badres, #{resource => authn, gateway => GwName, + reason => already_exist}}}. + +badres_listener_authn(not_found, GwName, LType, LName) -> + {error, {badres, #{resource => listener_authn, gateway => GwName, + listener => {GwName, LType, LName}, + reason => not_found}}}; +badres_listener_authn(already_exist, GwName, LType, LName) -> + {error, {badres, #{resource => listener_authn, gateway => GwName, + listener => {GwName, LType, LName}, + reason => already_exist}}}. + -spec post_config_update(list(atom()), emqx_config:update_request(), emqx_config:config(), diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 810a79987..634ae8252 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -23,6 +23,8 @@ -define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM). +-import(emqx_gateway_utils, [listener_id/3]). + %% Mgmt APIs - gateway -export([ gateways/1 ]). @@ -59,10 +61,7 @@ , with_authn/2 , with_listener_authn/3 , checks/2 - , schema_bad_request/0 - , schema_not_found/0 - , schema_internal_error/0 - , schema_no_content/0 + , reason2resp/1 ]). -type gateway_summary() :: @@ -131,7 +130,7 @@ current_connections_count(GwName) -> get_listeners_status(GwName, Config) -> Listeners = emqx_gateway_utils:normalize_config(Config), lists:map(fun({Type, LisName, ListenOn, _, _}) -> - Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName), + Name0 = listener_id(GwName, Type, LisName), Name = {Name0, ListenOn}, LisO = #{id => Name0, type => Type, name => LisName}, case catch esockd:listener(Name) of @@ -223,12 +222,7 @@ remove_authn(GwName, ListenerId) -> confexp(ok) -> ok; confexp({ok, Res}) -> {ok, Res}; -confexp({error, badarg}) -> - error({update_conf_error, badarg}); -confexp({error, not_found}) -> - error({update_conf_error, not_found}); -confexp({error, already_exist}) -> - error({update_conf_error, already_exist}). +confexp({error, Reason}) -> error(Reason). %%-------------------------------------------------------------------- %% Mgmt APIs - clients @@ -322,6 +316,59 @@ with_channel(GwName, ClientId, Fun) -> %% Utils %%-------------------------------------------------------------------- +-spec reason2resp({atom(), map()} | any()) -> binary() | any(). +reason2resp({badconf, #{key := Key, value := Value, reason := Reason}}) -> + fmt400err("Bad config value '~s' for '~s', reason: ~s", + [Value, Key, Reason]); +reason2resp({badres, #{resource := gateway, + gateway := GwName, + reason := not_found}}) -> + fmt400err("The ~s gateway is unloaded", [GwName]); + +reason2resp({badres, #{resource := gateway, + gateway := GwName, + reason := already_exist}}) -> + fmt400err("The ~s gateway has loaded", [GwName]); + +reason2resp({badres, #{resource := listener, + listener := {GwName, LType, LName}, + reason := not_found}}) -> + fmt400err("Listener ~s not found", + [listener_id(GwName, LType, LName)]); + +reason2resp({badres, #{resource := listener, + listener := {GwName, LType, LName}, + reason := already_exist}}) -> + fmt400err("The listener ~s of ~s already exist", + [listener_id(GwName, LType, LName), GwName]); + +reason2resp({badres, #{resource := authn, + gateway := GwName, + reason := not_found}}) -> + fmt400err("The authentication not found on ~s", [GwName]); + +reason2resp({badres, #{resource := authn, + gateway := GwName, + reason := already_exist}}) -> + fmt400err("The authentication already exist on ~s", [GwName]); + +reason2resp({badres, #{resource := listener_authn, + listener := {GwName, LType, LName}, + reason := not_found}}) -> + fmt400err("The authentication not found on ~s", + [listener_id(GwName, LType, LName)]); + +reason2resp({badres, #{resource := listener_authn, + listener := {GwName, LType, LName}, + reason := already_exist}}) -> + fmt400err("The authentication already exist on ~s", + [listener_id(GwName, LType, LName)]); + +reason2resp(R) -> return_http_error(500, R). + +fmt400err(Fmt, Args) -> + return_http_error(400, io_lib:format(Fmt, Args)). + -spec return_http_error(integer(), any()) -> {integer(), binary()}. return_http_error(Code, Msg) -> {Code, emqx_json:encode( @@ -378,19 +425,12 @@ with_gateway(GwName0, Fun) -> Path = lists:concat( lists:join(".", lists:map(fun to_list/1, Path0))), return_http_error(404, "Resource not found. path: " ++ Path); - %% Exceptions from: confexp/1 - error : {update_conf_error, badarg} -> - return_http_error(400, "Bad arguments"); - error : {update_conf_error, not_found} -> - return_http_error(404, "Resource not found"); - error : {update_conf_error, already_exist} -> - return_http_error(400, "Resource already exist"); Class : Reason : Stk -> ?SLOG(error, #{ msg => "uncatched_error" , reason => {Class, Reason} , stacktrace => Stk }), - return_http_error(500, {Class, Reason, Stk}) + reason2resp(Reason) end. -spec checks(list(), map()) -> ok. @@ -408,20 +448,6 @@ to_list(A) when is_atom(A) -> to_list(B) when is_binary(B) -> binary_to_list(B). -%%-------------------------------------------------------------------- -%% common schemas - -schema_bad_request() -> - emqx_mgmt_util:error_schema( - <<"Some Params missed">>, ['PARAMETER_MISSED']). -schema_internal_error() -> - emqx_mgmt_util:error_schema( - <<"Ineternal Server Error">>, ['INTERNAL_SERVER_ERROR']). -schema_not_found() -> - emqx_mgmt_util:error_schema(<<"Resource Not Found">>). -schema_no_content() -> - #{description => <<"No Content">>}. - %%-------------------------------------------------------------------- %% Internal funcs diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index efb3f6fe6..20db58512 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -112,7 +112,7 @@ init([Gateway, Ctx, _GwDscrptr]) -> true -> case cb_gateway_load(State) of {error, Reason} -> - {stop, {load_gateway_failure, Reason}}; + {stop, Reason}; {ok, NState} -> {ok, NState} end @@ -360,7 +360,7 @@ cb_gateway_unload(State = #state{name = GwName, , reason => {Class, Reason} , stacktrace => Stk }), - {error, {Class, Reason, Stk}} + {error, Reason} after _ = do_deinit_authn(State#state.authns) end. @@ -381,7 +381,7 @@ cb_gateway_load(State = #state{name = GwName, case CbMod:on_gateway_load(Gateway, NCtx) of {error, Reason} -> do_deinit_authn(AuthnNames), - throw({callback_return_error, Reason}); + {error, Reason}; {ok, ChildPidOrSpecs, GwState} -> ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ @@ -403,7 +403,7 @@ cb_gateway_load(State = #state{name = GwName, , reason => {Class, Reason1} , stacktrace => Stk }), - {error, {Class, Reason1, Stk}} + {error, Reason1} end. cb_gateway_update(Config, @@ -412,7 +412,7 @@ cb_gateway_update(Config, try #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of - {error, Reason} -> throw({callback_return_error, Reason}); + {error, Reason} -> {error, Reason}; {ok, ChildPidOrSpecs, NGwState} -> %% XXX: Hot-upgrade ??? ChildPids = start_child_process(ChildPidOrSpecs), @@ -430,7 +430,7 @@ cb_gateway_update(Config, , reason => {Class, Reason1} , stacktrace => Stk }), - {error, {Class, Reason1, Stk}} + {error, Reason1} end. start_child_process([]) -> []; diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index dfdf6ea2a..cc14eaa33 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -118,6 +118,7 @@ fields(mqttsn) -> [ {gateway_id, sc(integer(), #{ default => 1 + , nullable => false , desc => "MQTT-SN Gateway Id.
When the broadcast option is enabled, @@ -142,6 +143,7 @@ The client just sends its PUBLISH messages to a GW" , {predefined, sc(hoconsc:array(ref(mqttsn_predefined)), #{ default => [] + , nullable => {true, recursively} , desc => <<"The Pre-defined topic ids and topic names.
A 'pre-defined' topic id is a topic id whose mapping to a topic name @@ -217,6 +219,7 @@ fields(lwm2m) -> [ {xml_dir, sc(binary(), #{ default =>"etc/lwm2m_xml" + , nullable => false , desc => "The Directory for LwM2M Resource defination" })} , {lifetime_min, @@ -265,18 +268,21 @@ beyond this time window are temporarily stored in memory." fields(exproto) -> [ {server, sc(ref(exproto_grpc_server), - #{ desc => "Configurations for starting the ConnectionAdapter service" + #{ nullable => false + , desc => "Configurations for starting the ConnectionAdapter service" })} , {handler, sc(ref(exproto_grpc_handler), - #{ desc => "Configurations for request to ConnectionHandler service" + #{ nullable => false + , desc => "Configurations for request to ConnectionHandler service" })} , {listeners, sc(ref(udp_tcp_listeners))} ] ++ gateway_common_options(); fields(exproto_grpc_server) -> [ {bind, - sc(hoconsc:union([ip_port(), integer()]))} + sc(hoconsc:union([ip_port(), integer()]), + #{nullable => false})} , {ssl, sc(ref(ssl_server_opts), #{ nullable => {true, recursively} @@ -284,7 +290,7 @@ fields(exproto_grpc_server) -> ]; fields(exproto_grpc_handler) -> - [ {address, sc(binary())} + [ {address, sc(binary(), #{nullable => false})} , {ssl, sc(ref(ssl_client_opts), #{ nullable => {true, recursively} @@ -316,11 +322,13 @@ fields(lwm2m_translators) -> For each new LwM2M client that succeeds in going online, the gateway creates a the subscription relationship to receive downstream commands and send it to the LwM2M client" + , nullable => false })} , {response, sc(ref(translator), #{ desc => "The topic for gateway to publish the acknowledge events from LwM2M client" + , nullable => false })} , {notify, sc(ref(translator), @@ -328,21 +336,24 @@ the LwM2M client" "The topic for gateway to publish the notify events from LwM2M client.
After succeed observe a resource of LwM2M client, Gateway will send the notifyevents via this topic, if the client reports any resource changes" + , nullable => false })} , {register, sc(ref(translator), #{ desc => "The topic for gateway to publish the register events from LwM2M client.
" + , nullable => false })} , {update, sc(ref(translator), #{ desc => "The topic for gateway to publish the update events from LwM2M client.
" + , nullable => false })} ]; fields(translator) -> - [ {topic, sc(binary())} + [ {topic, sc(binary(), #{nullable => false})} , {qos, sc(range(0, 2), #{default => 0})} ]; diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index fa74f9437..8a81584d6 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -90,6 +90,7 @@ childspec(Id, Type, Mod, Args) -> -> {ok, pid()} | {error, supervisor:startchild_err()}. supervisor_ret({ok, Pid, _Info}) -> {ok, Pid}; +supervisor_ret({error, {Reason, _Child}}) -> {error, Reason}; supervisor_ret(Ret) -> Ret. -spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id()) diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index d0ac84322..46e3a1628 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -75,7 +75,13 @@ stop_grpc_server(GwName) -> start_grpc_client_channel(_GwName, undefined) -> undefined; start_grpc_client_channel(GwName, Options = #{address := Address}) -> - {Host, Port} = emqx_gateway_utils:parse_address(Address), + {Host, Port} = try emqx_gateway_utils:parse_address(Address) + catch error : badarg -> + throw({badconf, #{key => address, + value => Address, + reason => illegal_grpc_address + }}) + end, case maps:to_list(maps:get(ssl, Options, #{})) of [] -> SvrAddr = compose_http_uri(http, Host, Port), diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index 6e01161bb..ee27d89b1 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -50,14 +50,20 @@ unreg() -> on_gateway_load(_Gateway = #{ name := GwName, config := Config }, Ctx) -> - %% Xml registry - {ok, RegPid} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)), - - Listeners = emqx_gateway_utils:normalize_config(Config), - ListenerPids = lists:map(fun(Lis) -> - start_listener(GwName, Ctx, Lis) - end, Listeners), - {ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}}. + XmlDir = maps:get(xml_dir, Config), + case emqx_lwm2m_xml_object_db:start_link(XmlDir) of + {ok, RegPid} -> + Listeners = emqx_gateway_utils:normalize_config(Config), + ListenerPids = lists:map(fun(Lis) -> + start_listener(GwName, Ctx, Lis) + end, Listeners), + {ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}}; + {error, Reason} -> + throw({badconf, #{ key => xml_dir + , value => XmlDir + , reason => Reason + }}) + end. on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> GwName = maps:get(name, Gateway), diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl index 3cef3c19e..509971b15 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl @@ -47,6 +47,11 @@ %% API Function Definitions %% ------------------------------------------------------------------ +-spec start_link(string()) + -> {ok, pid()} + | ignore + | {error, no_xml_files_found} + | {error, term()}. start_link(XmlDir) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []). @@ -85,8 +90,11 @@ stop() -> init([XmlDir]) -> _ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]), - load(XmlDir), - {ok, #state{}}. + case load(XmlDir) of + ok -> + {ok, #state{}}; + {error, Reason} -> {stop, Reason} + end. handle_call(_Request, _From, State) -> {reply, ignored, State}. @@ -116,7 +124,7 @@ load(BaseDir) -> Wild end, case filelib:wildcard(Wild2) of - [] -> error(no_xml_files_found, BaseDir); + [] -> {error, no_xml_files_found}; AllXmlFiles -> load_loop(AllXmlFiles) end. diff --git a/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl index f3859532e..459ebe364 100644 --- a/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl @@ -245,8 +245,9 @@ t_load_unload_gateway(_) -> ?CONF_STOMP_AUTHN_1, ?CONF_STOMP_LISTENER_1), {ok, _} = emqx_gateway_conf:load_gateway(stomp, StompConf1), - {error, already_exist} = - emqx_gateway_conf:load_gateway(stomp, StompConf1), + ?assertMatch( + {error, {badres, #{reason := already_exist}}}, + emqx_gateway_conf:load_gateway(stomp, StompConf1)), assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])), {ok, _} = emqx_gateway_conf:update_gateway(stomp, StompConf2), @@ -255,8 +256,9 @@ t_load_unload_gateway(_) -> ok = emqx_gateway_conf:unload_gateway(stomp), ok = emqx_gateway_conf:unload_gateway(stomp), - {error, not_found} = - emqx_gateway_conf:update_gateway(stomp, StompConf2), + ?assertMatch( + {error, {badres, #{reason := not_found}}}, + emqx_gateway_conf:update_gateway(stomp, StompConf2)), ?assertException(error, {config_not_found, [gateway, stomp]}, emqx:get_raw_config([gateway, stomp])), @@ -280,8 +282,9 @@ t_load_remove_authn(_) -> ok = emqx_gateway_conf:remove_authn(<<"stomp">>), - {error, not_found} = - emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2), + ?assertMatch( + {error, {badres, #{reason := not_found}}}, + emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2)), ?assertException( error, {config_not_found, [gateway, stomp, authentication]}, @@ -312,9 +315,10 @@ t_load_remove_listeners(_) -> ok = emqx_gateway_conf:remove_listener( <<"stomp">>, {<<"tcp">>, <<"default">>}), - {error, not_found} = - emqx_gateway_conf:update_listener( - <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_2), + ?assertMatch( + {error, {badres, #{reason := not_found}}}, + emqx_gateway_conf:update_listener( + <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_2)), ?assertException( error, {config_not_found, [gateway, stomp, listeners, tcp, default]}, @@ -352,9 +356,10 @@ t_load_remove_listener_authn(_) -> ok = emqx_gateway_conf:remove_authn( <<"stomp">>, {<<"tcp">>, <<"default">>}), - {error, not_found} = - emqx_gateway_conf:update_authn( - <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2), + ?assertMatch( + {error, {badres, #{reason := not_found}}}, + emqx_gateway_conf:update_authn( + <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2)), Path = [gateway, stomp, listeners, tcp, default, authentication], ?assertException( @@ -426,9 +431,12 @@ t_add_listener_with_certs_content(_) -> ok = emqx_gateway_conf:remove_listener( <<"stomp">>, {<<"ssl">>, <<"default">>}), assert_ssl_confs_files_deleted(SslConf), - {error, not_found} = - emqx_gateway_conf:update_listener( - <<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2), + + ?assertMatch( + {error, {badres, #{reason := not_found}}}, + emqx_gateway_conf:update_listener( + <<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2)), + ?assertException( error, {config_not_found, [gateway, stomp, listeners, ssl, default]}, emqx:get_raw_config([gateway, stomp, listeners, ssl, default]) diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index c9ae1401d..6521a549c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -101,15 +101,15 @@ fields(ban) -> desc => <<"Banned type clientid, username, peerhost">>, nullable => false, example => username})}, - {who, hoconsc:mk(binary(), #{ + {who, hoconsc:mk(emqx_schema:unicode_binary(), #{ desc => <<"Client info as banned type">>, nullable => false, - example => <<"Badass">>})}, + example => <<"Badass坏"/utf8>>})}, {by, hoconsc:mk(binary(), #{ desc => <<"Commander">>, nullable => true, example => <<"mgmt_api">>})}, - {reason, hoconsc:mk(binary(), #{ + {reason, hoconsc:mk(emqx_schema:unicode_binary(), #{ desc => <<"Banned reason">>, nullable => true, example => <<"Too many requests">>})}, diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 195b45a3d..ce9553c82 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -220,37 +220,24 @@ os_info() -> [{os_name, Name}, {os_version, Version}]; {unix, _} -> - case file:read_file_info("/etc/os-release") of + case file:read_file("/etc/os-release") of {error, _} -> [{os_name, "Unknown"}, {os_version, "Unknown"}]; - {ok, FileInfo} -> - case FileInfo#file_info.access of - Access when Access =:= read orelse Access =:= read_write -> - OSInfo = lists:foldl(fun(Line, Acc) -> - [Var, Value] = string:tokens(Line, "="), - NValue = case Value of - _ when is_list(Value) -> - lists:nth(1, string:tokens(Value, "\"")); - _ -> - Value - end, - [{Var, NValue} | Acc] - end, [], string:tokens(os:cmd("cat /etc/os-release"), "\n")), - [{os_name, get_value("NAME", OSInfo, "Unknown")}, - {os_version, get_value("VERSION", OSInfo, - get_value("VERSION_ID", OSInfo, "Unknown"))}]; - _ -> - [{os_name, "Unknown"}, - {os_version, "Unknown"}] - end + {ok, FileContent} -> + OSInfo = parse_os_release(FileContent), + [{os_name, get_value("NAME", OSInfo)}, + {os_version, get_value("VERSION", OSInfo, + get_value("VERSION_ID", OSInfo, + get_value("PRETTY_NAME", OSInfo)))}] end; {win32, nt} -> Ver = os:cmd("ver"), case re:run(Ver, "[a-zA-Z ]+ \\[Version ([0-9]+[\.])+[0-9]+\\]", [{capture, none}]) of match -> [NVer | _] = string:tokens(Ver, "\r\n"), - {match, [Version]} = re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]), + {match, [Version]} = + re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]), [Name | _] = string:split(NVer, " [Version "), [{os_name, Name}, {os_version, Version}]; @@ -307,7 +294,8 @@ generate_uuid() -> <> = <<16#01:4, TimeHigh:12>>, <> = <<1:1, 0:1, ClockSeq:14>>, <> = <>, - list_to_binary(io_lib:format("~.16B-~.16B-~.16B-~.16B-~.16B", [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])). + list_to_binary(io_lib:format( "~.16B-~.16B-~.16B-~.16B-~.16B" + , [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])). get_telemetry(#state{uuid = UUID}) -> OSInfo = os_info(), @@ -339,7 +327,22 @@ report_telemetry(State = #state{url = URL}) -> httpc_request(Method, URL, Headers, Body) -> httpc:request(Method, {URL, Headers, "application/json", Body}, [], []). +parse_os_release(FileContent) -> + lists:foldl(fun(Line, Acc) -> + [Var, Value] = string:tokens(Line, "="), + NValue = case Value of + _ when is_list(Value) -> + lists:nth(1, string:tokens(Value, "\"")); + _ -> + Value + end, + [{Var, NValue} | Acc] + end, + [], string:tokens(binary:bin_to_list(FileContent), "\n")). + bin(L) when is_list(L) -> list_to_binary(L); +bin(A) when is_atom(A) -> + atom_to_binary(A); bin(B) when is_binary(B) -> B. diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 1caa8da23..d992cdc07 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -68,6 +68,7 @@ fields("rule_events") -> fields("rule_test") -> [ {"context", sc(hoconsc:union([ ref("ctx_pub") , ref("ctx_sub") + , ref("ctx_unsub") , ref("ctx_delivered") , ref("ctx_acked") , ref("ctx_dropped") diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 205f85488..cbfda16db 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -257,11 +257,16 @@ format_output(Outputs) -> [do_format_output(Out) || Out <- Outputs]. do_format_output(#{mod := Mod, func := Func, args := Args}) -> - #{function => list_to_binary(lists:concat([Mod,":",Func])), + #{function => printable_function_name(Mod, Func), args => maps:remove(preprocessed_tmpl, Args)}; do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> BridgeChannelId. +printable_function_name(emqx_rule_outputs, Func) -> + Func; +printable_function_name(Mod, Func) -> + list_to_binary(lists:concat([Mod,":",Func])). + get_rule_metrics(Id) -> Format = fun (Node, #{matched := Matched, rate := Current, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index ba516bfa7..5d72d5a6d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -182,6 +182,7 @@ rule_name() -> {"name", sc(binary(), #{ desc => "The name of the rule" , default => "" + , nullable => false , example => "foo" })}.