chore(gw): improve http error messages
This commit is contained in:
parent
3fd9061418
commit
6d4aac1600
|
@ -83,7 +83,7 @@ gateway(post, Request) ->
|
||||||
{ok, NGwConf} ->
|
{ok, NGwConf} ->
|
||||||
{201, NGwConf};
|
{201, NGwConf};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
return_http_error(500, Reason)
|
emqx_gateway_http:reason2resp(Reason)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
|
|
|
@ -745,7 +745,8 @@ common_client_props() ->
|
||||||
"due to exceeding the length">>})}
|
"due to exceeding the length">>})}
|
||||||
, {awaiting_rel_cnt,
|
, {awaiting_rel_cnt,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of awaiting PUBREC packet">>})}
|
%% FIXME: PUBREC ??
|
||||||
|
#{ desc => <<"Number of awaiting acknowledge packet">>})}
|
||||||
, {awaiting_rel_max,
|
, {awaiting_rel_max,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Maximum allowed number of awaiting PUBREC "
|
#{ desc => <<"Maximum allowed number of awaiting PUBREC "
|
||||||
|
@ -755,25 +756,25 @@ common_client_props() ->
|
||||||
#{ desc => <<"Number of bytes received by EMQ X Broker">>})}
|
#{ desc => <<"Number of bytes received by EMQ X Broker">>})}
|
||||||
, {recv_cnt,
|
, {recv_cnt,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of TCP packets received">>})}
|
#{ desc => <<"Number of socket packets received">>})}
|
||||||
, {recv_pkt,
|
, {recv_pkt,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of MQTT packets received">>})}
|
#{ desc => <<"Number of protocol packets received">>})}
|
||||||
, {recv_msg,
|
, {recv_msg,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of PUBLISH packets received">>})}
|
#{ desc => <<"Number of message packets received">>})}
|
||||||
, {send_oct,
|
, {send_oct,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of bytes sent">>})}
|
#{ desc => <<"Number of bytes sent">>})}
|
||||||
, {send_cnt,
|
, {send_cnt,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of TCP packets sent">>})}
|
#{ desc => <<"Number of socket packets sent">>})}
|
||||||
, {send_pkt,
|
, {send_pkt,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of MQTT packets sent">>})}
|
#{ desc => <<"Number of protocol packets sent">>})}
|
||||||
, {send_msg,
|
, {send_msg,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Number of PUBLISH packets sent">>})}
|
#{ desc => <<"Number of message packets sent">>})}
|
||||||
, {mailbox_len,
|
, {mailbox_len,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{ desc => <<"Process mailbox size">>})}
|
#{ desc => <<"Process mailbox size">>})}
|
||||||
|
|
|
@ -248,7 +248,8 @@ update(Req) ->
|
||||||
res(emqx_conf:update([gateway], Req, #{override_to => cluster})).
|
res(emqx_conf:update([gateway], Req, #{override_to => cluster})).
|
||||||
|
|
||||||
res({ok, Result}) -> {ok, Result};
|
res({ok, Result}) -> {ok, Result};
|
||||||
res({error, {pre_config_update,emqx_gateway_conf,Reason}}) -> {error, Reason};
|
res({error, {pre_config_update,?MODULE,Reason}}) -> {error, Reason};
|
||||||
|
res({error, {post_config_update,?MODULE,Reason}}) -> {error, Reason};
|
||||||
res({error, Reason}) -> {error, Reason}.
|
res({error, Reason}) -> {error, Reason}.
|
||||||
|
|
||||||
bin({LType, LName}) ->
|
bin({LType, LName}) ->
|
||||||
|
@ -314,12 +315,12 @@ pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
|
||||||
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
|
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
|
||||||
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})};
|
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})};
|
||||||
_ ->
|
_ ->
|
||||||
{error, already_exist}
|
badres_gateway(already_exist, GwName)
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
|
pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
|
||||||
case maps:get(GwName, RawConf, undefined) of
|
case maps:get(GwName, RawConf, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
badres_gateway(not_found, GwName);
|
||||||
_ ->
|
_ ->
|
||||||
NConf = maps:without([<<"listeners">>, ?AUTHN_BIN], Conf),
|
NConf = maps:without([<<"listeners">>, ?AUTHN_BIN], Conf),
|
||||||
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
|
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
|
||||||
|
@ -341,13 +342,13 @@ pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
RawConf,
|
RawConf,
|
||||||
#{GwName => #{<<"listeners">> => NListener}})};
|
#{GwName => #{<<"listeners">> => NListener}})};
|
||||||
_ ->
|
_ ->
|
||||||
{error, already_exist}
|
badres_listener(already_exist, GwName, LType, LName)
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
case emqx_map_lib:deep_get(
|
case emqx_map_lib:deep_get(
|
||||||
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
|
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
badres_listener(not_found, GwName, LType, LName);
|
||||||
OldConf ->
|
OldConf ->
|
||||||
NConf = convert_certs(certs_dir(GwName), Conf, OldConf),
|
NConf = convert_certs(certs_dir(GwName), Conf, OldConf),
|
||||||
NListener = #{LType => #{LName => NConf}},
|
NListener = #{LType => #{LName => NConf}},
|
||||||
|
@ -374,14 +375,14 @@ pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
|
||||||
RawConf,
|
RawConf,
|
||||||
#{GwName => #{?AUTHN_BIN => Conf}})};
|
#{GwName => #{?AUTHN_BIN => Conf}})};
|
||||||
_ ->
|
_ ->
|
||||||
{error, already_exist}
|
badres_authn(already_exist, GwName)
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
case emqx_map_lib:deep_get(
|
case emqx_map_lib:deep_get(
|
||||||
[GwName, <<"listeners">>, LType, LName],
|
[GwName, <<"listeners">>, LType, LName],
|
||||||
RawConf, undefined) of
|
RawConf, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
badres_listener(not_found, GwName, LType, LName);
|
||||||
Listener ->
|
Listener ->
|
||||||
case maps:get(?AUTHN_BIN, Listener, undefined) of
|
case maps:get(?AUTHN_BIN, Listener, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -391,14 +392,14 @@ pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
#{LType => #{LName => NListener}}}},
|
#{LType => #{LName => NListener}}}},
|
||||||
{ok, emqx_map_lib:deep_merge(RawConf, NGateway)};
|
{ok, emqx_map_lib:deep_merge(RawConf, NGateway)};
|
||||||
_ ->
|
_ ->
|
||||||
{error, already_exist}
|
badres_listener_authn(already_exist, GwName, LType, LName)
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
|
pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
|
||||||
case emqx_map_lib:deep_get(
|
case emqx_map_lib:deep_get(
|
||||||
[GwName, ?AUTHN_BIN], RawConf, undefined) of
|
[GwName, ?AUTHN_BIN], RawConf, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
badres_authn(not_found, GwName);
|
||||||
_ ->
|
_ ->
|
||||||
{ok, emqx_map_lib:deep_merge(
|
{ok, emqx_map_lib:deep_merge(
|
||||||
RawConf,
|
RawConf,
|
||||||
|
@ -409,11 +410,11 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
|
||||||
[GwName, <<"listeners">>, LType, LName],
|
[GwName, <<"listeners">>, LType, LName],
|
||||||
RawConf, undefined) of
|
RawConf, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
badres_listener(not_found, GwName, LType, LName);
|
||||||
Listener ->
|
Listener ->
|
||||||
case maps:get(?AUTHN_BIN, Listener, undefined) of
|
case maps:get(?AUTHN_BIN, Listener, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
badres_listener_authn(not_found, GwName, LType, LName);
|
||||||
Auth ->
|
Auth ->
|
||||||
NListener = maps:put(
|
NListener = maps:put(
|
||||||
?AUTHN_BIN,
|
?AUTHN_BIN,
|
||||||
|
@ -437,6 +438,38 @@ pre_config_update(_, UnknownReq, _RawConf) ->
|
||||||
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
|
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
|
||||||
{error, badreq}.
|
{error, badreq}.
|
||||||
|
|
||||||
|
badres_gateway(not_found, GwName) ->
|
||||||
|
{error, {badres, #{resource => gateway, gateway => GwName,
|
||||||
|
reason => not_found}}};
|
||||||
|
badres_gateway(already_exist, GwName) ->
|
||||||
|
{error, {badres, #{resource => gateway, gateway => GwName,
|
||||||
|
reason => already_exist}}}.
|
||||||
|
|
||||||
|
badres_listener(not_found, GwName, LType, LName) ->
|
||||||
|
{error, {badres, #{resource => listener, gateway => GwName,
|
||||||
|
listener => {GwName, LType, LName},
|
||||||
|
reason => not_found}}};
|
||||||
|
badres_listener(already_exist, GwName, LType, LName) ->
|
||||||
|
{error, {badres, #{resource => listener, gateway => GwName,
|
||||||
|
listener => {GwName, LType, LName},
|
||||||
|
reason => already_exist}}}.
|
||||||
|
|
||||||
|
badres_authn(not_found, GwName) ->
|
||||||
|
{error, {badres, #{resource => authn, gateway => GwName,
|
||||||
|
reason => not_found}}};
|
||||||
|
badres_authn(already_exist, GwName) ->
|
||||||
|
{error, {badres, #{resource => authn, gateway => GwName,
|
||||||
|
reason => already_exist}}}.
|
||||||
|
|
||||||
|
badres_listener_authn(not_found, GwName, LType, LName) ->
|
||||||
|
{error, {badres, #{resource => listener_authn, gateway => GwName,
|
||||||
|
listener => {GwName, LType, LName},
|
||||||
|
reason => not_found}}};
|
||||||
|
badres_listener_authn(already_exist, GwName, LType, LName) ->
|
||||||
|
{error, {badres, #{resource => listener_authn, gateway => GwName,
|
||||||
|
listener => {GwName, LType, LName},
|
||||||
|
reason => already_exist}}}.
|
||||||
|
|
||||||
-spec post_config_update(list(atom()),
|
-spec post_config_update(list(atom()),
|
||||||
emqx_config:update_request(),
|
emqx_config:update_request(),
|
||||||
emqx_config:config(),
|
emqx_config:config(),
|
||||||
|
|
|
@ -23,6 +23,8 @@
|
||||||
|
|
||||||
-define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
|
-define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
|
||||||
|
|
||||||
|
-import(emqx_gateway_utils, [listener_id/3]).
|
||||||
|
|
||||||
%% Mgmt APIs - gateway
|
%% Mgmt APIs - gateway
|
||||||
-export([ gateways/1
|
-export([ gateways/1
|
||||||
]).
|
]).
|
||||||
|
@ -59,10 +61,7 @@
|
||||||
, with_authn/2
|
, with_authn/2
|
||||||
, with_listener_authn/3
|
, with_listener_authn/3
|
||||||
, checks/2
|
, checks/2
|
||||||
, schema_bad_request/0
|
, reason2resp/1
|
||||||
, schema_not_found/0
|
|
||||||
, schema_internal_error/0
|
|
||||||
, schema_no_content/0
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type gateway_summary() ::
|
-type gateway_summary() ::
|
||||||
|
@ -131,7 +130,7 @@ current_connections_count(GwName) ->
|
||||||
get_listeners_status(GwName, Config) ->
|
get_listeners_status(GwName, Config) ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||||
lists:map(fun({Type, LisName, ListenOn, _, _}) ->
|
lists:map(fun({Type, LisName, ListenOn, _, _}) ->
|
||||||
Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
Name0 = listener_id(GwName, Type, LisName),
|
||||||
Name = {Name0, ListenOn},
|
Name = {Name0, ListenOn},
|
||||||
LisO = #{id => Name0, type => Type, name => LisName},
|
LisO = #{id => Name0, type => Type, name => LisName},
|
||||||
case catch esockd:listener(Name) of
|
case catch esockd:listener(Name) of
|
||||||
|
@ -223,12 +222,7 @@ remove_authn(GwName, ListenerId) ->
|
||||||
|
|
||||||
confexp(ok) -> ok;
|
confexp(ok) -> ok;
|
||||||
confexp({ok, Res}) -> {ok, Res};
|
confexp({ok, Res}) -> {ok, Res};
|
||||||
confexp({error, badarg}) ->
|
confexp({error, Reason}) -> error(Reason).
|
||||||
error({update_conf_error, badarg});
|
|
||||||
confexp({error, not_found}) ->
|
|
||||||
error({update_conf_error, not_found});
|
|
||||||
confexp({error, already_exist}) ->
|
|
||||||
error({update_conf_error, already_exist}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mgmt APIs - clients
|
%% Mgmt APIs - clients
|
||||||
|
@ -322,6 +316,59 @@ with_channel(GwName, ClientId, Fun) ->
|
||||||
%% Utils
|
%% Utils
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec reason2resp({atom(), map()} | any()) -> binary() | any().
|
||||||
|
reason2resp({badconf, #{key := Key, value := Value, reason := Reason}}) ->
|
||||||
|
fmt400err("Bad config value '~s' for '~s', reason: ~s",
|
||||||
|
[Value, Key, Reason]);
|
||||||
|
reason2resp({badres, #{resource := gateway,
|
||||||
|
gateway := GwName,
|
||||||
|
reason := not_found}}) ->
|
||||||
|
fmt400err("The ~s gateway is unloaded", [GwName]);
|
||||||
|
|
||||||
|
reason2resp({badres, #{resource := gateway,
|
||||||
|
gateway := GwName,
|
||||||
|
reason := already_exist}}) ->
|
||||||
|
fmt400err("The ~s gateway has loaded", [GwName]);
|
||||||
|
|
||||||
|
reason2resp({badres, #{resource := listener,
|
||||||
|
listener := {GwName, LType, LName},
|
||||||
|
reason := not_found}}) ->
|
||||||
|
fmt400err("Listener ~s not found",
|
||||||
|
[listener_id(GwName, LType, LName)]);
|
||||||
|
|
||||||
|
reason2resp({badres, #{resource := listener,
|
||||||
|
listener := {GwName, LType, LName},
|
||||||
|
reason := already_exist}}) ->
|
||||||
|
fmt400err("The listener ~s of ~s already exist",
|
||||||
|
[listener_id(GwName, LType, LName), GwName]);
|
||||||
|
|
||||||
|
reason2resp({badres, #{resource := authn,
|
||||||
|
gateway := GwName,
|
||||||
|
reason := not_found}}) ->
|
||||||
|
fmt400err("The authentication not found on ~s", [GwName]);
|
||||||
|
|
||||||
|
reason2resp({badres, #{resource := authn,
|
||||||
|
gateway := GwName,
|
||||||
|
reason := already_exist}}) ->
|
||||||
|
fmt400err("The authentication already exist on ~s", [GwName]);
|
||||||
|
|
||||||
|
reason2resp({badres, #{resource := listener_authn,
|
||||||
|
listener := {GwName, LType, LName},
|
||||||
|
reason := not_found}}) ->
|
||||||
|
fmt400err("The authentication not found on ~s",
|
||||||
|
[listener_id(GwName, LType, LName)]);
|
||||||
|
|
||||||
|
reason2resp({badres, #{resource := listener_authn,
|
||||||
|
listener := {GwName, LType, LName},
|
||||||
|
reason := already_exist}}) ->
|
||||||
|
fmt400err("The authentication already exist on ~s",
|
||||||
|
[listener_id(GwName, LType, LName)]);
|
||||||
|
|
||||||
|
reason2resp(R) -> return_http_error(500, R).
|
||||||
|
|
||||||
|
fmt400err(Fmt, Args) ->
|
||||||
|
return_http_error(400, io_lib:format(Fmt, Args)).
|
||||||
|
|
||||||
-spec return_http_error(integer(), any()) -> {integer(), binary()}.
|
-spec return_http_error(integer(), any()) -> {integer(), binary()}.
|
||||||
return_http_error(Code, Msg) ->
|
return_http_error(Code, Msg) ->
|
||||||
{Code, emqx_json:encode(
|
{Code, emqx_json:encode(
|
||||||
|
@ -378,19 +425,12 @@ with_gateway(GwName0, Fun) ->
|
||||||
Path = lists:concat(
|
Path = lists:concat(
|
||||||
lists:join(".", lists:map(fun to_list/1, Path0))),
|
lists:join(".", lists:map(fun to_list/1, Path0))),
|
||||||
return_http_error(404, "Resource not found. path: " ++ Path);
|
return_http_error(404, "Resource not found. path: " ++ Path);
|
||||||
%% Exceptions from: confexp/1
|
|
||||||
error : {update_conf_error, badarg} ->
|
|
||||||
return_http_error(400, "Bad arguments");
|
|
||||||
error : {update_conf_error, not_found} ->
|
|
||||||
return_http_error(404, "Resource not found");
|
|
||||||
error : {update_conf_error, already_exist} ->
|
|
||||||
return_http_error(400, "Resource already exist");
|
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
?SLOG(error, #{ msg => "uncatched_error"
|
?SLOG(error, #{ msg => "uncatched_error"
|
||||||
, reason => {Class, Reason}
|
, reason => {Class, Reason}
|
||||||
, stacktrace => Stk
|
, stacktrace => Stk
|
||||||
}),
|
}),
|
||||||
return_http_error(500, {Class, Reason, Stk})
|
reason2resp(Reason)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec checks(list(), map()) -> ok.
|
-spec checks(list(), map()) -> ok.
|
||||||
|
@ -408,20 +448,6 @@ to_list(A) when is_atom(A) ->
|
||||||
to_list(B) when is_binary(B) ->
|
to_list(B) when is_binary(B) ->
|
||||||
binary_to_list(B).
|
binary_to_list(B).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% common schemas
|
|
||||||
|
|
||||||
schema_bad_request() ->
|
|
||||||
emqx_mgmt_util:error_schema(
|
|
||||||
<<"Some Params missed">>, ['PARAMETER_MISSED']).
|
|
||||||
schema_internal_error() ->
|
|
||||||
emqx_mgmt_util:error_schema(
|
|
||||||
<<"Ineternal Server Error">>, ['INTERNAL_SERVER_ERROR']).
|
|
||||||
schema_not_found() ->
|
|
||||||
emqx_mgmt_util:error_schema(<<"Resource Not Found">>).
|
|
||||||
schema_no_content() ->
|
|
||||||
#{description => <<"No Content">>}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,7 @@ init([Gateway, Ctx, _GwDscrptr]) ->
|
||||||
true ->
|
true ->
|
||||||
case cb_gateway_load(State) of
|
case cb_gateway_load(State) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{stop, {load_gateway_failure, Reason}};
|
{stop, Reason};
|
||||||
{ok, NState} ->
|
{ok, NState} ->
|
||||||
{ok, NState}
|
{ok, NState}
|
||||||
end
|
end
|
||||||
|
@ -360,7 +360,7 @@ cb_gateway_unload(State = #state{name = GwName,
|
||||||
, reason => {Class, Reason}
|
, reason => {Class, Reason}
|
||||||
, stacktrace => Stk
|
, stacktrace => Stk
|
||||||
}),
|
}),
|
||||||
{error, {Class, Reason, Stk}}
|
{error, Reason}
|
||||||
after
|
after
|
||||||
_ = do_deinit_authn(State#state.authns)
|
_ = do_deinit_authn(State#state.authns)
|
||||||
end.
|
end.
|
||||||
|
@ -381,7 +381,7 @@ cb_gateway_load(State = #state{name = GwName,
|
||||||
case CbMod:on_gateway_load(Gateway, NCtx) of
|
case CbMod:on_gateway_load(Gateway, NCtx) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
do_deinit_authn(AuthnNames),
|
do_deinit_authn(AuthnNames),
|
||||||
throw({callback_return_error, Reason});
|
{error, Reason};
|
||||||
{ok, ChildPidOrSpecs, GwState} ->
|
{ok, ChildPidOrSpecs, GwState} ->
|
||||||
ChildPids = start_child_process(ChildPidOrSpecs),
|
ChildPids = start_child_process(ChildPidOrSpecs),
|
||||||
{ok, State#state{
|
{ok, State#state{
|
||||||
|
@ -403,7 +403,7 @@ cb_gateway_load(State = #state{name = GwName,
|
||||||
, reason => {Class, Reason1}
|
, reason => {Class, Reason1}
|
||||||
, stacktrace => Stk
|
, stacktrace => Stk
|
||||||
}),
|
}),
|
||||||
{error, {Class, Reason1, Stk}}
|
{error, Reason1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cb_gateway_update(Config,
|
cb_gateway_update(Config,
|
||||||
|
@ -412,7 +412,7 @@ cb_gateway_update(Config,
|
||||||
try
|
try
|
||||||
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
|
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
|
||||||
case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of
|
case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of
|
||||||
{error, Reason} -> throw({callback_return_error, Reason});
|
{error, Reason} -> {error, Reason};
|
||||||
{ok, ChildPidOrSpecs, NGwState} ->
|
{ok, ChildPidOrSpecs, NGwState} ->
|
||||||
%% XXX: Hot-upgrade ???
|
%% XXX: Hot-upgrade ???
|
||||||
ChildPids = start_child_process(ChildPidOrSpecs),
|
ChildPids = start_child_process(ChildPidOrSpecs),
|
||||||
|
@ -430,7 +430,7 @@ cb_gateway_update(Config,
|
||||||
, reason => {Class, Reason1}
|
, reason => {Class, Reason1}
|
||||||
, stacktrace => Stk
|
, stacktrace => Stk
|
||||||
}),
|
}),
|
||||||
{error, {Class, Reason1, Stk}}
|
{error, Reason1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_child_process([]) -> [];
|
start_child_process([]) -> [];
|
||||||
|
|
|
@ -90,6 +90,7 @@ childspec(Id, Type, Mod, Args) ->
|
||||||
-> {ok, pid()}
|
-> {ok, pid()}
|
||||||
| {error, supervisor:startchild_err()}.
|
| {error, supervisor:startchild_err()}.
|
||||||
supervisor_ret({ok, Pid, _Info}) -> {ok, Pid};
|
supervisor_ret({ok, Pid, _Info}) -> {ok, Pid};
|
||||||
|
supervisor_ret({error, {Reason, _Child}}) -> {error, Reason};
|
||||||
supervisor_ret(Ret) -> Ret.
|
supervisor_ret(Ret) -> Ret.
|
||||||
|
|
||||||
-spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id())
|
-spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id())
|
||||||
|
|
|
@ -75,7 +75,13 @@ stop_grpc_server(GwName) ->
|
||||||
start_grpc_client_channel(_GwName, undefined) ->
|
start_grpc_client_channel(_GwName, undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
|
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
|
||||||
{Host, Port} = emqx_gateway_utils:parse_address(Address),
|
{Host, Port} = try emqx_gateway_utils:parse_address(Address)
|
||||||
|
catch error : badarg ->
|
||||||
|
throw({badconf, #{key => address,
|
||||||
|
value => Address,
|
||||||
|
reason => illegal_grpc_address
|
||||||
|
}})
|
||||||
|
end,
|
||||||
case maps:to_list(maps:get(ssl, Options, #{})) of
|
case maps:to_list(maps:get(ssl, Options, #{})) of
|
||||||
[] ->
|
[] ->
|
||||||
SvrAddr = compose_http_uri(http, Host, Port),
|
SvrAddr = compose_http_uri(http, Host, Port),
|
||||||
|
|
|
@ -50,14 +50,20 @@ unreg() ->
|
||||||
on_gateway_load(_Gateway = #{ name := GwName,
|
on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, Ctx) ->
|
}, Ctx) ->
|
||||||
%% Xml registry
|
XmlDir = maps:get(xml_dir, Config),
|
||||||
{ok, RegPid} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)),
|
case emqx_lwm2m_xml_object_db:start_link(XmlDir) of
|
||||||
|
{ok, RegPid} ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||||
ListenerPids = lists:map(fun(Lis) ->
|
ListenerPids = lists:map(fun(Lis) ->
|
||||||
start_listener(GwName, Ctx, Lis)
|
start_listener(GwName, Ctx, Lis)
|
||||||
end, Listeners),
|
end, Listeners),
|
||||||
{ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}}.
|
{ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}};
|
||||||
|
{error, Reason} ->
|
||||||
|
throw({badconf, #{ key => xml_dir
|
||||||
|
, value => XmlDir
|
||||||
|
, reason => Reason
|
||||||
|
}})
|
||||||
|
end.
|
||||||
|
|
||||||
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwName = maps:get(name, Gateway),
|
GwName = maps:get(name, Gateway),
|
||||||
|
|
|
@ -47,6 +47,11 @@
|
||||||
%% API Function Definitions
|
%% API Function Definitions
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec start_link(string())
|
||||||
|
-> {ok, pid()}
|
||||||
|
| ignore
|
||||||
|
| {error, no_xml_files_found}
|
||||||
|
| {error, term()}.
|
||||||
start_link(XmlDir) ->
|
start_link(XmlDir) ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
|
||||||
|
|
||||||
|
@ -85,8 +90,11 @@ stop() ->
|
||||||
init([XmlDir]) ->
|
init([XmlDir]) ->
|
||||||
_ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]),
|
_ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]),
|
||||||
_ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]),
|
_ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]),
|
||||||
load(XmlDir),
|
case load(XmlDir) of
|
||||||
{ok, #state{}}.
|
ok ->
|
||||||
|
{ok, #state{}};
|
||||||
|
{error, Reason} -> {stop, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
@ -116,7 +124,7 @@ load(BaseDir) ->
|
||||||
Wild
|
Wild
|
||||||
end,
|
end,
|
||||||
case filelib:wildcard(Wild2) of
|
case filelib:wildcard(Wild2) of
|
||||||
[] -> error(no_xml_files_found, BaseDir);
|
[] -> {error, no_xml_files_found};
|
||||||
AllXmlFiles -> load_loop(AllXmlFiles)
|
AllXmlFiles -> load_loop(AllXmlFiles)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -245,8 +245,9 @@ t_load_unload_gateway(_) ->
|
||||||
?CONF_STOMP_AUTHN_1,
|
?CONF_STOMP_AUTHN_1,
|
||||||
?CONF_STOMP_LISTENER_1),
|
?CONF_STOMP_LISTENER_1),
|
||||||
{ok, _} = emqx_gateway_conf:load_gateway(stomp, StompConf1),
|
{ok, _} = emqx_gateway_conf:load_gateway(stomp, StompConf1),
|
||||||
{error, already_exist} =
|
?assertMatch(
|
||||||
emqx_gateway_conf:load_gateway(stomp, StompConf1),
|
{error, {badres, #{reason := already_exist}}},
|
||||||
|
emqx_gateway_conf:load_gateway(stomp, StompConf1)),
|
||||||
assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
|
assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
|
||||||
|
|
||||||
{ok, _} = emqx_gateway_conf:update_gateway(stomp, StompConf2),
|
{ok, _} = emqx_gateway_conf:update_gateway(stomp, StompConf2),
|
||||||
|
@ -255,8 +256,9 @@ t_load_unload_gateway(_) ->
|
||||||
ok = emqx_gateway_conf:unload_gateway(stomp),
|
ok = emqx_gateway_conf:unload_gateway(stomp),
|
||||||
ok = emqx_gateway_conf:unload_gateway(stomp),
|
ok = emqx_gateway_conf:unload_gateway(stomp),
|
||||||
|
|
||||||
{error, not_found} =
|
?assertMatch(
|
||||||
emqx_gateway_conf:update_gateway(stomp, StompConf2),
|
{error, {badres, #{reason := not_found}}},
|
||||||
|
emqx_gateway_conf:update_gateway(stomp, StompConf2)),
|
||||||
|
|
||||||
?assertException(error, {config_not_found, [gateway, stomp]},
|
?assertException(error, {config_not_found, [gateway, stomp]},
|
||||||
emqx:get_raw_config([gateway, stomp])),
|
emqx:get_raw_config([gateway, stomp])),
|
||||||
|
@ -280,8 +282,9 @@ t_load_remove_authn(_) ->
|
||||||
|
|
||||||
ok = emqx_gateway_conf:remove_authn(<<"stomp">>),
|
ok = emqx_gateway_conf:remove_authn(<<"stomp">>),
|
||||||
|
|
||||||
{error, not_found} =
|
?assertMatch(
|
||||||
emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2),
|
{error, {badres, #{reason := not_found}}},
|
||||||
|
emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2)),
|
||||||
|
|
||||||
?assertException(
|
?assertException(
|
||||||
error, {config_not_found, [gateway, stomp, authentication]},
|
error, {config_not_found, [gateway, stomp, authentication]},
|
||||||
|
@ -312,9 +315,10 @@ t_load_remove_listeners(_) ->
|
||||||
ok = emqx_gateway_conf:remove_listener(
|
ok = emqx_gateway_conf:remove_listener(
|
||||||
<<"stomp">>, {<<"tcp">>, <<"default">>}),
|
<<"stomp">>, {<<"tcp">>, <<"default">>}),
|
||||||
|
|
||||||
{error, not_found} =
|
?assertMatch(
|
||||||
|
{error, {badres, #{reason := not_found}}},
|
||||||
emqx_gateway_conf:update_listener(
|
emqx_gateway_conf:update_listener(
|
||||||
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_2),
|
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_2)),
|
||||||
|
|
||||||
?assertException(
|
?assertException(
|
||||||
error, {config_not_found, [gateway, stomp, listeners, tcp, default]},
|
error, {config_not_found, [gateway, stomp, listeners, tcp, default]},
|
||||||
|
@ -352,9 +356,10 @@ t_load_remove_listener_authn(_) ->
|
||||||
ok = emqx_gateway_conf:remove_authn(
|
ok = emqx_gateway_conf:remove_authn(
|
||||||
<<"stomp">>, {<<"tcp">>, <<"default">>}),
|
<<"stomp">>, {<<"tcp">>, <<"default">>}),
|
||||||
|
|
||||||
{error, not_found} =
|
?assertMatch(
|
||||||
|
{error, {badres, #{reason := not_found}}},
|
||||||
emqx_gateway_conf:update_authn(
|
emqx_gateway_conf:update_authn(
|
||||||
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2),
|
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2)),
|
||||||
|
|
||||||
Path = [gateway, stomp, listeners, tcp, default, authentication],
|
Path = [gateway, stomp, listeners, tcp, default, authentication],
|
||||||
?assertException(
|
?assertException(
|
||||||
|
@ -426,9 +431,12 @@ t_add_listener_with_certs_content(_) ->
|
||||||
ok = emqx_gateway_conf:remove_listener(
|
ok = emqx_gateway_conf:remove_listener(
|
||||||
<<"stomp">>, {<<"ssl">>, <<"default">>}),
|
<<"stomp">>, {<<"ssl">>, <<"default">>}),
|
||||||
assert_ssl_confs_files_deleted(SslConf),
|
assert_ssl_confs_files_deleted(SslConf),
|
||||||
{error, not_found} =
|
|
||||||
|
?assertMatch(
|
||||||
|
{error, {badres, #{reason := not_found}}},
|
||||||
emqx_gateway_conf:update_listener(
|
emqx_gateway_conf:update_listener(
|
||||||
<<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2),
|
<<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2)),
|
||||||
|
|
||||||
?assertException(
|
?assertException(
|
||||||
error, {config_not_found, [gateway, stomp, listeners, ssl, default]},
|
error, {config_not_found, [gateway, stomp, listeners, ssl, default]},
|
||||||
emqx:get_raw_config([gateway, stomp, listeners, ssl, default])
|
emqx:get_raw_config([gateway, stomp, listeners, ssl, default])
|
||||||
|
|
Loading…
Reference in New Issue