chore(gw): adapt the lates minirest and emqx-config

This commit is contained in:
JianBo He 2021-08-27 11:25:00 +08:00 committed by turtleDeng
parent dacc53facf
commit fdb41fe137
4 changed files with 80 additions and 55 deletions

View File

@ -22,7 +22,7 @@
%% callbacks for emqx_config_handler
-export([ pre_config_update/2
, post_config_update/3
, post_config_update/4
]).
%% APIs
@ -88,7 +88,10 @@ stop(Name) ->
-> ok
| {error, any()}.
update_rawconf(RawName, RawConfDiff) ->
emqx:update_config([gateway], {RawName, RawConfDiff}).
case emqx:update_config([gateway], {RawName, RawConfDiff}) of
{ok, _Result} -> ok;
{error, Reason} -> {error, Reason}
end.
%%--------------------------------------------------------------------
%% Config Handler
@ -99,8 +102,9 @@ pre_config_update({RawName, RawConfDiff}, RawConf) ->
{ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}.
-spec post_config_update(emqx_config:update_request(), emqx_config:config(),
emqx_config:config()) -> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update({RawName, _}, NewConfig, OldConfig) ->
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update({RawName, _}, NewConfig, OldConfig, _AppEnvs) ->
GwName = binary_to_existing_atom(RawName),
SubConf = maps:get(GwName, NewConfig),
case maps:get(GwName, OldConfig, undefined) of

View File

@ -29,6 +29,7 @@
%% http handlers
-export([ gateway/2
, gateway_insta/2
, gateway_insta_stats/2
]).
-define(EXAMPLE_GATEWAY_LIST,
@ -240,7 +241,7 @@ metadata(gateway_insta) ->
requestBody => schema(schema_for_gateway_conf()),
responses => #{
<<"404">> => NameNotFoundRespDef,
<<"204">> => #{description => <<"Created">>}
<<"200">> => #{description => <<"Changed">>}
}
}
};
@ -336,41 +337,45 @@ schema_for_gateway_stats() ->
%% http handlers
gateway(get, Request) ->
Params = cowboy_req:parse_qs(Request),
Status = case proplists:get_value(<<"status">>, Params) of
Params = maps:get(query_string, Request, #{}),
Status = case maps:get(<<"status">>, Params, undefined) of
undefined -> all;
S0 -> binary_to_existing_atom(S0, utf8)
end,
{200, emqx_gateway_intr:gateways(Status)}.
gateway_insta(delete, Request) ->
Name = binary_to_existing_atom(cowboy_req:binding(name, Request)),
gateway_insta(delete, #{bindings := #{name := Name0}}) ->
Name = binary_to_existing_atom(Name0),
case emqx_gateway:unload(Name) of
ok ->
{200, ok};
{error, not_found} ->
{404, <<"Not Found">>}
end;
gateway_insta(get, Request) ->
Name = binary_to_existing_atom(cowboy_req:binding(name, Request)),
gateway_insta(get, #{bindings := #{name := Name0}}) ->
Name = binary_to_existing_atom(Name0),
case emqx_gateway:lookup(Name) of
#{config := Config} ->
%% TODO: ??? RawConf or Config or RunningState ???
{200, Config};
#{config := _Config} ->
%% FIXME: Got the parsed config, but we should return rawconfig to
%% frontend
RawConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])
),
{200, emqx_map_lib:deep_get([<<"gateway">>, Name0], RawConf)};
undefined ->
{404, <<"Not Found">>}
end;
gateway_insta(post, Request) ->
Name = cowboy_req:binding(name, Request),
{ok, RawConf, _NRequest} = cowboy_req:read_body(Request),
%% XXX: Consistence ??
case emqx_gateway:update_rawconf(Name, RawConf) of
gateway_insta(put, #{body := RawConfsIn,
bindings := #{name := Name}
}) ->
%% FIXME: Cluster Consistence ??
case emqx_gateway:update_rawconf(Name, RawConfsIn) of
ok ->
{200, ok};
{200, <<"Changed">>};
{error, not_found} ->
{404, <<"Not Found">>};
{error, Reason} ->
{500, Reason}
{500, emqx_gateway_utils:stringfy(Reason)}
end.
gateway_insta_stats(get, _Req) ->

View File

@ -183,13 +183,15 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
detailed_gateway_info(State) ->
#{name => State#state.name,
config => State#state.config,
status => State#state.status,
created_at => State#state.created_at,
started_at => State#state.started_at,
stopped_at => State#state.stopped_at
}.
maps:filter(
fun(_, V) -> V =/= undefined end,
#{name => State#state.name,
config => State#state.config,
status => State#state.status,
created_at => State#state.created_at,
started_at => State#state.started_at,
stopped_at => State#state.stopped_at
}).
%%--------------------------------------------------------------------
%% Internal funcs
@ -226,7 +228,7 @@ do_deinit_authn(undefined) ->
ok;
do_deinit_authn(AuthnRef) ->
%% TODO:
?LOG(error, "Failed to clean authn ~p, not suppported now", [AuthnRef]).
?LOG(warning, "Failed to clean authn ~p, not suppported now", [AuthnRef]).
%case emqx_authn:delete_chain(AuthnRef) of
% ok -> ok;
% {error, {not_found, _}} ->
@ -307,33 +309,40 @@ cb_gateway_unload(State = #state{name = GwName,
cb_gateway_load(State = #state{name = GwName,
config = Config,
ctx = Ctx}) ->
Gateway = detailed_gateway_info(State),
try
AuthnRef = do_init_authn(GwName, Config),
NCtx = Ctx#{auth => AuthnRef},
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_load(Gateway, NCtx) of
{error, Reason} ->
do_deinit_authn(AuthnRef),
throw({callback_return_error, Reason});
{ok, ChildPidOrSpecs, GwState} ->
ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{
ctx = NCtx,
status = running,
child_pids = ChildPids,
gw_state = GwState,
stopped_at = undefined,
started_at = erlang:system_time(millisecond)
}}
end
catch
Class : Reason1 : Stk ->
?LOG(error, "Failed to load ~s gateway (~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[GwName, Gateway, Ctx,
Class, Reason1, Stk]),
{error, {Class, Reason1, Stk}}
case maps:get(enable, Config, true) of
false ->
?LOG(info, "Skipp to start ~s gateway due to disabled", [GwName]);
true ->
try
AuthnRef = do_init_authn(GwName, Config),
NCtx = Ctx#{auth => AuthnRef},
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_load(Gateway, NCtx) of
{error, Reason} ->
do_deinit_authn(AuthnRef),
throw({callback_return_error, Reason});
{ok, ChildPidOrSpecs, GwState} ->
ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{
ctx = NCtx,
status = running,
child_pids = ChildPids,
gw_state = GwState,
stopped_at = undefined,
started_at = erlang:system_time(millisecond)
}}
end
catch
Class : Reason1 : Stk ->
?LOG(error, "Failed to load ~s gateway (~0p, ~0p) "
"crashed: {~p, ~p}, stacktrace: ~0p",
[GwName, Gateway, Ctx,
Class, Reason1, Stk]),
{error, {Class, Reason1, Stk}}
end
end.
cb_gateway_update(Config,

View File

@ -31,6 +31,9 @@
, unix_ts_to_rfc3339/2
]).
-export([ stringfy/1
]).
-export([ normalize_config/1
]).
@ -118,6 +121,10 @@ unix_ts_to_rfc3339(Key, Map) ->
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)}
end.
-spec stringfy(term()) -> binary().
stringfy(T) ->
iolist_to_binary(io_lib:format("~0p", [T])).
-spec normalize_config(emqx_config:config())
-> list({ Type :: udp | tcp | ssl | dtls
, Name :: atom()