refactor(gw): change the on_gateway_update/3 callback params
This commit is contained in:
parent
bce130d9f9
commit
8886d60720
|
@ -30,8 +30,8 @@
|
||||||
| {ok, [Childspec :: supervisor:child_spec()], GwState :: state()}.
|
| {ok, [Childspec :: supervisor:child_spec()], GwState :: state()}.
|
||||||
|
|
||||||
%% @doc
|
%% @doc
|
||||||
-callback on_gateway_update(NewGateway :: gateway(),
|
-callback on_gateway_update(Config :: emqx_config:config(),
|
||||||
OldGateway :: gateway(),
|
Gateway :: gateway(),
|
||||||
GwState :: state())
|
GwState :: state())
|
||||||
-> ok
|
-> ok
|
||||||
| {ok, [ChildPid :: pid()], NGwState :: state()}
|
| {ok, [ChildPid :: pid()], NGwState :: state()}
|
||||||
|
|
|
@ -58,13 +58,13 @@ on_gateway_load(_Gateway = #{name := GwName,
|
||||||
|
|
||||||
{ok, ListenerPids, #{ctx => Ctx}}.
|
{ok, ListenerPids, #{ctx => Ctx}}.
|
||||||
|
|
||||||
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwName = maps:get(name, NewGateway),
|
GwName = maps:get(name, Gateway),
|
||||||
try
|
try
|
||||||
%% XXX: 1. How hot-upgrade the changes ???
|
%% XXX: 1. How hot-upgrade the changes ???
|
||||||
%% XXX: 2. Check the New confs first before destroy old instance ???
|
%% XXX: 2. Check the New confs first before destroy old instance ???
|
||||||
on_gateway_unload(OldGateway, GwState),
|
on_gateway_unload(Gateway, GwState),
|
||||||
on_gateway_load(NewGateway, Ctx)
|
on_gateway_load(Gateway#{config => Config}, Ctx)
|
||||||
catch
|
catch
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
logger:error("Failed to update ~s; "
|
logger:error("Failed to update ~s; "
|
||||||
|
|
|
@ -63,8 +63,7 @@ lookup(Name) ->
|
||||||
|
|
||||||
-spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}.
|
-spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}.
|
||||||
update(Name, Config) ->
|
update(Name, Config) ->
|
||||||
NewGateway = #{name => Name, config => Config},
|
emqx_gateway_sup:update_gateway(Name, Config).
|
||||||
emqx_gateway_sup:update_gateway(NewGateway).
|
|
||||||
|
|
||||||
-spec start(gateway_name()) -> ok | {error, any()}.
|
-spec start(gateway_name()) -> ok | {error, any()}.
|
||||||
start(Name) ->
|
start(Name) ->
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-export([ create_insta/3
|
-export([ create_insta/3
|
||||||
, remove_insta/2
|
, remove_insta/2
|
||||||
, update_insta/2
|
, update_insta/3
|
||||||
, start_insta/2
|
, start_insta/2
|
||||||
, stop_insta/2
|
, stop_insta/2
|
||||||
, list_insta/1
|
, list_insta/1
|
||||||
|
@ -72,12 +72,13 @@ remove_insta(Sup, Name) ->
|
||||||
ok = supervisor:delete_child(Sup, Name)
|
ok = supervisor:delete_child(Sup, Name)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec update_insta(pid(), NewGateway :: gateway()) -> ok | {error, any()}.
|
-spec update_insta(pid(), gateway_name(), emqx_config:config())
|
||||||
update_insta(Sup, NewGateway = #{name := Name}) ->
|
-> ok | {error, any()}.
|
||||||
|
update_insta(Sup, Name, Config) ->
|
||||||
case emqx_gateway_utils:find_sup_child(Sup, Name) of
|
case emqx_gateway_utils:find_sup_child(Sup, Name) of
|
||||||
false -> {error, not_found};
|
false -> {error, not_found};
|
||||||
{ok, GwInstaPid} ->
|
{ok, GwInstaPid} ->
|
||||||
emqx_gateway_insta_sup:update(GwInstaPid, NewGateway)
|
emqx_gateway_insta_sup:update(GwInstaPid, Config)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec start_insta(pid(), gateway_name()) -> ok | {error, any()}.
|
-spec start_insta(pid(), gateway_name()) -> ok | {error, any()}.
|
||||||
|
|
|
@ -75,9 +75,9 @@ enable(Pid) ->
|
||||||
call(Pid, enable).
|
call(Pid, enable).
|
||||||
|
|
||||||
%% @doc Update the gateway configurations
|
%% @doc Update the gateway configurations
|
||||||
-spec update(pid(), gateway()) -> ok | {error, any()}.
|
-spec update(pid(), emqx_config:config()) -> ok | {error, any()}.
|
||||||
update(Pid, NewGateway) ->
|
update(Pid, Config) ->
|
||||||
call(Pid, {update, NewGateway}).
|
call(Pid, {update, Config}).
|
||||||
|
|
||||||
call(Pid, Req) ->
|
call(Pid, Req) ->
|
||||||
gen_server:call(Pid, Req, 5000).
|
gen_server:call(Pid, Req, 5000).
|
||||||
|
@ -125,12 +125,7 @@ do_deinit_context(Ctx) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
handle_call(info, _From, State = #state{gw = Gateway}) ->
|
handle_call(info, _From, State = #state{gw = Gateway}) ->
|
||||||
GwInfo = Gateway#{status => State#state.status,
|
{reply, state2info(Gateway), State};
|
||||||
created_at => State#state.created_at,
|
|
||||||
started_at => State#state.started_at,
|
|
||||||
stopped_at => State#state.stopped_at
|
|
||||||
},
|
|
||||||
{reply, GwInfo, State};
|
|
||||||
|
|
||||||
handle_call(disable, _From, State = #state{status = Status}) ->
|
handle_call(disable, _From, State = #state{status = Status}) ->
|
||||||
case Status of
|
case Status of
|
||||||
|
@ -158,32 +153,12 @@ handle_call(enable, _From, State = #state{status = Status}) ->
|
||||||
{reply, {error, already_started}, State}
|
{reply, {error, already_started}, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% Stopped -> update
|
handle_call({update, Config}, _From, State) ->
|
||||||
handle_call({update, NewGateway}, _From, State = #state{
|
case cb_gateway_update(Config, State) of
|
||||||
gw = Gateway,
|
{ok, NState} ->
|
||||||
status = stopped}) ->
|
{reply, ok, NState};
|
||||||
case maps:get(name, NewGateway, undefined)
|
{error, Reason} ->
|
||||||
== maps:get(name, Gateway, undefined) of
|
{reply, {error, Reason}, State}
|
||||||
true ->
|
|
||||||
{reply, ok, State#state{gw = NewGateway}};
|
|
||||||
false ->
|
|
||||||
{reply, {error, gateway_name_not_match}, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% Running -> update
|
|
||||||
handle_call({update, NewGateway}, _From, State = #state{gw = Gateway,
|
|
||||||
status = running}) ->
|
|
||||||
case maps:get(name, NewGateway, undefined)
|
|
||||||
== maps:get(name, Gateway, undefined) of
|
|
||||||
true ->
|
|
||||||
case cb_gateway_update(NewGateway, State) of
|
|
||||||
{ok, NState} ->
|
|
||||||
{reply, ok, NState};
|
|
||||||
{error, Reason} ->
|
|
||||||
{reply, {error, Reason}, State}
|
|
||||||
end;
|
|
||||||
false ->
|
|
||||||
{reply, {error, gateway_name_not_match}, State}
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
|
@ -223,6 +198,14 @@ terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
state2info(State = #state{gw = Gateway}) ->
|
||||||
|
Gateway#{
|
||||||
|
status => State#state.status,
|
||||||
|
created_at => State#state.created_at,
|
||||||
|
started_at => State#state.started_at,
|
||||||
|
stopped_at => State#state.stopped_at
|
||||||
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -299,13 +282,12 @@ cb_gateway_load(State = #state{gw = Gateway = #{name := GwName},
|
||||||
{error, {Class, Reason1, Stk}}
|
{error, {Class, Reason1, Stk}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cb_gateway_update(NewGateway,
|
cb_gateway_update(Config,
|
||||||
State = #state{gw = Gateway = #{name := GwName},
|
State = #state{gw = #{name := GwName},
|
||||||
ctx = Ctx,
|
|
||||||
gw_state = GwState}) ->
|
gw_state = GwState}) ->
|
||||||
try
|
try
|
||||||
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
|
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
|
||||||
case CbMod:on_gateway_update(NewGateway, Gateway, GwState) of
|
case CbMod:on_gateway_update(Config, state2info(State), GwState) of
|
||||||
{error, Reason} -> throw({callback_return_error, Reason});
|
{error, Reason} -> throw({callback_return_error, Reason});
|
||||||
{ok, ChildPidOrSpecs, NGwState} ->
|
{ok, ChildPidOrSpecs, NGwState} ->
|
||||||
%% XXX: Hot-upgrade ???
|
%% XXX: Hot-upgrade ???
|
||||||
|
@ -317,9 +299,9 @@ cb_gateway_update(NewGateway,
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
Class : Reason1 : Stk ->
|
Class : Reason1 : Stk ->
|
||||||
logger:error("Failed to update gateway (~0p, ~0p, ~0p) crashed: "
|
logger:error("Failed to update ~s gateway to config: ~0p crashed: "
|
||||||
"{~p, ~p}, stacktrace: ~0p",
|
"{~p, ~p}, stacktrace: ~0p",
|
||||||
[NewGateway, Gateway, Ctx,
|
[GwName, Config,
|
||||||
Class, Reason1, Stk]),
|
Class, Reason1, Stk]),
|
||||||
{error, {Class, Reason1, Stk}}
|
{error, {Class, Reason1, Stk}}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -234,10 +234,6 @@ authentication() ->
|
||||||
, hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config)
|
, hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config)
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%translations() -> [].
|
|
||||||
%
|
|
||||||
%translations(_) -> [].
|
|
||||||
|
|
||||||
gateway_common_options() ->
|
gateway_common_options() ->
|
||||||
[ {enable, t(boolean(), undefined, true)}
|
[ {enable, t(boolean(), undefined, true)}
|
||||||
, {enable_stats, t(boolean(), undefined, true)}
|
, {enable_stats, t(boolean(), undefined, true)}
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
-export([ load_gateway/1
|
-export([ load_gateway/1
|
||||||
, unload_gateway/1
|
, unload_gateway/1
|
||||||
, lookup_gateway/1
|
, lookup_gateway/1
|
||||||
, update_gateway/1
|
, update_gateway/2
|
||||||
, start_gateway_insta/1
|
, start_gateway_insta/1
|
||||||
, stop_gateway_insta/1
|
, stop_gateway_insta/1
|
||||||
, list_gateway_insta/0
|
, list_gateway_insta/0
|
||||||
|
@ -74,13 +74,13 @@ lookup_gateway(GwName) ->
|
||||||
undefined
|
undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec update_gateway(gateway())
|
-spec update_gateway(gateway_name(), emqx_config:config())
|
||||||
-> ok
|
-> ok
|
||||||
| {error, any()}.
|
| {error, any()}.
|
||||||
update_gateway(NewGateway = #{name := GwName}) ->
|
update_gateway(GwName, Config) ->
|
||||||
case emqx_gateway_utils:find_sup_child(?MODULE, GwName) of
|
case emqx_gateway_utils:find_sup_child(?MODULE, GwName) of
|
||||||
{ok, GwSup} ->
|
{ok, GwSup} ->
|
||||||
emqx_gateway_gw_sup:update_insta(GwSup, NewGateway);
|
emqx_gateway_gw_sup:update_insta(GwSup, GwName, Config);
|
||||||
_ -> {error, not_found}
|
_ -> {error, not_found}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -109,13 +109,13 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
end, Listeners),
|
end, Listeners),
|
||||||
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
|
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
|
||||||
|
|
||||||
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwName = maps:get(name, NewGateway),
|
GwName = maps:get(name, Gateway),
|
||||||
try
|
try
|
||||||
%% XXX: 1. How hot-upgrade the changes ???
|
%% XXX: 1. How hot-upgrade the changes ???
|
||||||
%% XXX: 2. Check the New confs first before destroy old instance ???
|
%% XXX: 2. Check the New confs first before destroy old instance ???
|
||||||
on_gateway_unload(OldGateway, GwState),
|
on_gateway_unload(Gateway, GwState),
|
||||||
on_gateway_load(NewGateway, Ctx)
|
on_gateway_load(Gateway#{config => Config}, Ctx)
|
||||||
catch
|
catch
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
logger:error("Failed to update ~s; "
|
logger:error("Failed to update ~s; "
|
||||||
|
|
|
@ -77,13 +77,13 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
end, Listeners),
|
end, Listeners),
|
||||||
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
|
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
|
||||||
|
|
||||||
on_gateway_update(NewGateway = #{name := GwName}, OldGateway,
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwState = #{ctx := Ctx}) ->
|
GwName = maps:get(name, Gateway),
|
||||||
try
|
try
|
||||||
%% XXX: 1. How hot-upgrade the changes ???
|
%% XXX: 1. How hot-upgrade the changes ???
|
||||||
%% XXX: 2. Check the New confs first before destroy old instance ???
|
%% XXX: 2. Check the New confs first before destroy old instance ???
|
||||||
on_gateway_unload(OldGateway, GwState),
|
on_gateway_unload(Gateway, GwState),
|
||||||
on_gateway_load(NewGateway, Ctx)
|
on_gateway_load(Gateway#{config => Config}, Ctx)
|
||||||
catch
|
catch
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
logger:error("Failed to update ~s; "
|
logger:error("Failed to update ~s; "
|
||||||
|
@ -92,9 +92,9 @@ on_gateway_update(NewGateway = #{name := GwName}, OldGateway,
|
||||||
{error, {Class, Reason}}
|
{error, {Class, Reason}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_gateway_unload(_Insta = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, _GwState) ->
|
}, _GwState) ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||||
lists:foreach(fun(Lis) ->
|
lists:foreach(fun(Lis) ->
|
||||||
stop_listener(GwName, Lis)
|
stop_listener(GwName, Lis)
|
||||||
|
|
|
@ -62,13 +62,13 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
%% FIXME: Assign ctx to GwState
|
%% FIXME: Assign ctx to GwState
|
||||||
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
|
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
|
||||||
|
|
||||||
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwName = maps:get(name, NewGateway),
|
GwName = maps:get(name, Gateway),
|
||||||
try
|
try
|
||||||
%% XXX: 1. How hot-upgrade the changes ???
|
%% XXX: 1. How hot-upgrade the changes ???
|
||||||
%% XXX: 2. Check the New confs first before destroy old state???
|
%% XXX: 2. Check the New confs first before destroy old state???
|
||||||
on_gateway_unload(OldGateway, GwState),
|
on_gateway_unload(Gateway, GwState),
|
||||||
on_gateway_load(NewGateway, Ctx)
|
on_gateway_load(Gateway#{config => Config}, Ctx)
|
||||||
catch
|
catch
|
||||||
Class : Reason : Stk ->
|
Class : Reason : Stk ->
|
||||||
logger:error("Failed to update ~s; "
|
logger:error("Failed to update ~s; "
|
||||||
|
|
Loading…
Reference in New Issue