refactor(gw): rename gateway type to name

This commit is contained in:
JianBo He 2021-08-17 11:51:52 +08:00
parent 9072a60652
commit c5a4e05418
15 changed files with 434 additions and 429 deletions

View File

@ -18,11 +18,11 @@
-define(EMQX_GATEWAY_HRL, 1).
-type instance_id() :: atom().
-type gateway_type() :: atom().
-type gateway_name() :: atom().
%% @doc The Gateway defination
-type gateway() ::
#{ type := gateway_type()
#{ name := gateway_name()
, descr => binary() | undefined
%% Appears only in creating or detailed info
, rawconf => map()

View File

@ -48,18 +48,18 @@ unreg() ->
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_gateway_load(_Gateway = #{type := GwType,
on_gateway_load(_Gateway = #{name := GwName,
rawconf := RawConf
}, Ctx) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwType, Ctx, Lis)
start_listener(GwName, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, #{ctx => Ctx}}.
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
GwName = maps:get(name, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
@ -69,37 +69,37 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
Class : Reason : Stk ->
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwType, Class, Reason, Stk]),
[GwName, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_gateway_unload(_Gateway = #{ type := GwType,
on_gateway_unload(_Gateway = #{ name := GwName,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(GwType, Lis)
stop_listener(GwName, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
[GwName, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
[GwName, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_coap_frame,
@ -114,21 +114,21 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
name(GwName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])).
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
[GwName, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
[GwName, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type),
esockd:close(Name, ListenOn).

View File

@ -30,7 +30,7 @@
]).
-spec registered_gateway() ->
[{gateway_type(), emqx_gateway_registry:descriptor()}].
[{gateway_name(), emqx_gateway_registry:descriptor()}].
registered_gateway() ->
emqx_gateway_registry:list().
@ -41,35 +41,35 @@ registered_gateway() ->
list() ->
emqx_gateway_sup:list_gateway_insta().
-spec load(gateway_type(), map())
-spec load(gateway_name(), map())
-> {ok, pid()}
| {error, any()}.
load(GwType, RawConf) ->
Gateway = #{ type => GwType
load(Name, RawConf) ->
Gateway = #{ name => Name
, descr => undefined
, rawconf => RawConf
},
emqx_gateway_sup:load_gateway(Gateway).
-spec unload(gateway_type()) -> ok | {error, any()}.
unload(GwType) ->
emqx_gateway_sup:unload_gateway(GwType).
-spec unload(gateway_name()) -> ok | {error, any()}.
unload(Name) ->
emqx_gateway_sup:unload_gateway(Name).
-spec lookup(gateway_type()) -> gateway() | undefined.
lookup(GwType) ->
emqx_gateway_sup:lookup_gateway(GwType).
-spec lookup(gateway_name()) -> gateway() | undefined.
lookup(Name) ->
emqx_gateway_sup:lookup_gateway(Name).
-spec update(gateway()) -> ok | {error, any()}.
update(NewGateway) ->
emqx_gateway_sup:update_gateway(NewGateway).
-spec start(gateway_type()) -> ok | {error, any()}.
start(GwType) ->
emqx_gateway_sup:start_gateway_insta(GwType).
-spec start(gateway_name()) -> ok | {error, any()}.
start(Name) ->
emqx_gateway_sup:start_gateway_insta(Name).
-spec stop(gateway_type()) -> ok | {error, any()}.
stop(GwType) ->
emqx_gateway_sup:stop_gateway_insta(GwType).
-spec stop(gateway_name()) -> ok | {error, any()}.
stop(Name) ->
emqx_gateway_sup:stop_gateway_insta(Name).
%%--------------------------------------------------------------------
%% Internal funcs

View File

@ -50,30 +50,31 @@ is_cmd(Fun) ->
%% Cmds
gateway(["list"]) ->
lists:foreach(fun(#{type := Type, status := Status}) ->
%% FIXME: Get the real running status
emqx_ctl:print("Gateway(type=~s, status=~s~n",
[Type, Status])
lists:foreach(fun(#{name := Name} = Gateway) ->
%% XXX: More infos: listeners?, connected?
Status = maps:get(status, Gateway, stopped),
emqx_ctl:print("Gateway(name=~s, status=~s)~n",
[Name, Status])
end, emqx_gateway:list());
gateway(["lookup", GwType]) ->
case emqx_gateway:lookup(atom(GwType)) of
gateway(["lookup", Name]) ->
case emqx_gateway:lookup(atom(Name)) of
undefined ->
emqx_ctl:print("undefined~n");
Info ->
emqx_ctl:print("~p~n", [Info])
end;
gateway(["stop", GwType]) ->
case emqx_gateway:stop(atom(GwType)) of
gateway(["stop", Name]) ->
case emqx_gateway:stop(atom(Name)) of
ok ->
emqx_ctl:print("ok~n");
{error, Reason} ->
emqx_ctl:print("Error: ~p~n", [Reason])
end;
gateway(["start", GwType]) ->
case emqx_gateway:start(atom(GwType)) of
gateway(["start", Name]) ->
case emqx_gateway:start(atom(Name)) of
ok ->
emqx_ctl:print("ok~n");
{error, Reason} ->
@ -84,60 +85,60 @@ gateway(_) ->
%% TODO: create/remove APIs
emqx_ctl:usage([ {"gateway list",
"List all gateway"}
, {"gateway lookup <GatewayType>",
"Looup a gateway detailed informations"}
, {"gateway stop <GatewayType>",
, {"gateway lookup <Name>",
"Lookup a gateway detailed informations"}
, {"gateway stop <Name>",
"Stop a gateway instance"}
, {"gateway start <GatewayType>",
, {"gateway start <Name>",
"Start a gateway instance"}
]).
'gateway-registry'(["list"]) ->
lists:foreach(
fun({GwType, #{cbkmod := CbMod}}) ->
emqx_ctl:print("Registered Type: ~s, Callback Module: ~s~n", [GwType, CbMod])
fun({Name, #{cbkmod := CbMod}}) ->
emqx_ctl:print("Registered Name: ~s, Callback Module: ~s~n", [Name, CbMod])
end,
emqx_gateway_registry:list());
'gateway-registry'(_) ->
emqx_ctl:usage([ {"gateway-registry list",
"List all registered gateway types"}
"List all registered gateways"}
]).
'gateway-clients'(["list", Type]) ->
InfoTab = emqx_gateway_cm:tabname(info, Type),
'gateway-clients'(["list", Name]) ->
InfoTab = emqx_gateway_cm:tabname(info, Name),
dump(InfoTab, client);
'gateway-clients'(["lookup", Type, ClientId]) ->
ChanTab = emqx_gateway_cm:tabname(chan, Type),
'gateway-clients'(["lookup", Name, ClientId]) ->
ChanTab = emqx_gateway_cm:tabname(chan, Name),
case ets:lookup(ChanTab, bin(ClientId)) of
[] -> emqx_ctl:print("Not Found.~n");
[Chann] ->
InfoTab = emqx_gateway_cm:tabname(info, Type),
InfoTab = emqx_gateway_cm:tabname(info, Name),
[ChannInfo] = ets:lookup(InfoTab, Chann),
print({client, ChannInfo})
end;
'gateway-clients'(["kick", Type, ClientId]) ->
case emqx_gateway_cm:kick_session(Type, bin(ClientId)) of
'gateway-clients'(["kick", Name, ClientId]) ->
case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of
ok -> emqx_ctl:print("ok~n");
_ -> emqx_ctl:print("Not Found.~n")
end;
'gateway-clients'(_) ->
emqx_ctl:usage([ {"gateway-clients list <Type>",
"List all clients for a type of gateway"}
, {"gateway-clients lookup <Type> <ClientId>",
emqx_ctl:usage([ {"gateway-clients list <Name>",
"List all clients for a gateway"}
, {"gateway-clients lookup <Name> <ClientId>",
"Lookup the Client Info for specified client"}
, {"gateway-clients kick <Type> <ClientId>",
, {"gateway-clients kick <Name> <ClientId>",
"Kick out a client"}
]).
'gateway-metrics'([GatewayType]) ->
Tab = emqx_gateway_metrics:tabname(GatewayType),
'gateway-metrics'([Name]) ->
Tab = emqx_gateway_metrics:tabname(Name),
case ets:info(Tab) of
undefined ->
emqx_ctl:print("Bad Gateway Type.~n");
emqx_ctl:print("Bad Gateway Name.~n");
_ ->
lists:foreach(
fun({K, V}) ->
@ -146,7 +147,7 @@ gateway(_) ->
end;
'gateway-metrics'(_) ->
emqx_ctl:usage([ {"gateway-metrics <GatewayType>",
emqx_ctl:usage([ {"gateway-metrics <Name>",
"List all metrics for a gateway"}
]).

View File

@ -61,13 +61,13 @@
]).
-record(state, {
type :: atom(), %% Gateway Type
locker :: pid(), %% ClientId Locker for CM
registry :: pid(), %% ClientId Registry server
gwname :: gateway_name(), %% Gateway Name
locker :: pid(), %% ClientId Locker for CM
registry :: pid(), %% ClientId Registry server
chan_pmon :: emqx_pmon:pmon()
}).
-type option() :: {type, gateway_type()}.
-type option() :: {gwname, gateway_name()}.
-type options() :: list(option()).
-define(T_TAKEOVER, 15000).
@ -79,142 +79,147 @@
-spec start_link(options()) -> {ok, pid()} | ignore | {error, any()}.
start_link(Options) ->
Type = proplists:get_value(type, Options),
gen_server:start_link({local, procname(Type)}, ?MODULE, Options, []).
GwName = proplists:get_value(gwname, Options),
gen_server:start_link({local, procname(GwName)}, ?MODULE, Options, []).
procname(Type) ->
list_to_atom(lists:concat([emqx_gateway_, Type, '_cm'])).
procname(GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_cm'])).
-spec cmtabs(Type :: atom()) -> {ChanTab :: atom(),
ConnTab :: atom(),
ChannInfoTab :: atom()}.
cmtabs(Type) ->
{ tabname(chan, Type) %% Client Tabname; Record: {ClientId, Pid}
, tabname(conn, Type) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod}
, tabname(info, Type) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats}
-spec cmtabs(GwName :: gateway_name())
-> {ChanTab :: atom(),
ConnTab :: atom(),
ChannInfoTab :: atom()}.
cmtabs(GwName) ->
{ tabname(chan, GwName) %% Client Tabname; Record: {ClientId, Pid}
, tabname(conn, GwName) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod}
, tabname(info, GwName) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats}
}.
tabname(chan, Type) ->
list_to_atom(lists:concat([emqx_gateway_, Type, '_channel']));
tabname(conn, Type) ->
list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_conn']));
tabname(info, Type) ->
list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_info'])).
tabname(chan, GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel']));
tabname(conn, GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel_conn']));
tabname(info, GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_channel_info'])).
lockername(Type) ->
list_to_atom(lists:concat([emqx_gateway_, Type, '_locker'])).
lockername(GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_locker'])).
-spec register_channel(atom(), binary(), pid(), emqx_types:conninfo()) -> ok.
register_channel(Type, ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
-spec register_channel(gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:conninfo()) -> ok.
register_channel(GwName, ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(tabname(chan, Type), Chan),
true = ets:insert(tabname(conn, Type), {Chan, ConnMod}),
ok = emqx_gateway_cm_registry:register_channel(Type, Chan),
cast(procname(Type), {registered, Chan}).
true = ets:insert(tabname(chan, GwName), Chan),
true = ets:insert(tabname(conn, GwName), {Chan, ConnMod}),
ok = emqx_gateway_cm_registry:register_channel(GwName, Chan),
cast(procname(GwName), {registered, Chan}).
%% @doc Unregister a channel.
-spec unregister_channel(atom(), emqx_types:clientid()) -> ok.
unregister_channel(Type, ClientId) when is_binary(ClientId) ->
true = do_unregister_channel(Type, {ClientId, self()}, cmtabs(Type)),
-spec unregister_channel(gateway_name(), emqx_types:clientid()) -> ok.
unregister_channel(GwName, ClientId) when is_binary(ClientId) ->
true = do_unregister_channel(GwName, {ClientId, self()}, cmtabs(GwName)),
ok.
%% @doc Insert/Update the channel info and stats
-spec insert_channel_info(atom(),
-spec insert_channel_info(gateway_name(),
emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()) -> ok.
insert_channel_info(Type, ClientId, Info, Stats) ->
insert_channel_info(GwName, ClientId, Info, Stats) ->
Chan = {ClientId, self()},
true = ets:insert(tabname(info, Type), {Chan, Info, Stats}),
true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}),
%%?tp(debug, insert_channel_info, #{client_id => ClientId}),
ok.
%% @doc Get info of a channel.
-spec get_chan_info(gateway_type(), emqx_types:clientid())
-spec get_chan_info(gateway_name(), emqx_types:clientid())
-> emqx_types:infos() | undefined.
get_chan_info(Type, ClientId) ->
with_channel(Type, ClientId,
get_chan_info(GwName, ClientId) ->
with_channel(GwName, ClientId,
fun(ChanPid) ->
get_chan_info(Type, ClientId, ChanPid)
get_chan_info(GwName, ClientId, ChanPid)
end).
-spec get_chan_info(gateway_type(), emqx_types:clientid(), pid())
-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:infos() | undefined.
get_chan_info(Type, ClientId, ChanPid) when node(ChanPid) == node() ->
get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(tabname(info, Type), Chan, 2)
try ets:lookup_element(tabname(info, GwName), Chan, 2)
catch
error:badarg -> undefined
end;
get_chan_info(Type, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [Type, ClientId, ChanPid]).
get_chan_info(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]).
%% @doc Update infos of the channel.
-spec set_chan_info(gateway_type(),
-spec set_chan_info(gateway_name(),
emqx_types:clientid(),
emqx_types:infos()) -> boolean().
set_chan_info(Type, ClientId, Infos) ->
set_chan_info(Type, ClientId, self(), Infos).
set_chan_info(GwName, ClientId, Infos) ->
set_chan_info(GwName, ClientId, self(), Infos).
-spec set_chan_info(gateway_type(),
-spec set_chan_info(gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:infos()) -> boolean().
set_chan_info(Type, ClientId, ChanPid, Infos) when node(ChanPid) == node() ->
set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try ets:update_element(tabname(info, Type), Chan, {2, Infos})
try ets:update_element(tabname(info, GwName), Chan, {2, Infos})
catch
error:badarg -> false
end;
set_chan_info(Type, ClientId, ChanPid, Infos) ->
rpc_call(node(ChanPid), set_chan_info, [Type, ClientId, ChanPid, Infos]).
set_chan_info(GwName, ClientId, ChanPid, Infos) ->
rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]).
%% @doc Get channel's stats.
-spec get_chan_stats(gateway_type(), emqx_types:clientid())
-spec get_chan_stats(gateway_name(), emqx_types:clientid())
-> emqx_types:stats() | undefined.
get_chan_stats(Type, ClientId) ->
with_channel(Type, ClientId,
get_chan_stats(GwName, ClientId) ->
with_channel(GwName, ClientId,
fun(ChanPid) ->
get_chan_stats(Type, ClientId, ChanPid)
get_chan_stats(GwName, ClientId, ChanPid)
end).
-spec get_chan_stats(gateway_type(), emqx_types:clientid(), pid())
-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:stats() | undefined.
get_chan_stats(Type, ClientId, ChanPid) when node(ChanPid) == node() ->
get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(tabname(info, Type), Chan, 3)
try ets:lookup_element(tabname(info, GwName), Chan, 3)
catch
error:badarg -> undefined
end;
get_chan_stats(Type, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [Type, ClientId, ChanPid]).
get_chan_stats(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]).
-spec set_chan_stats(gateway_type(),
-spec set_chan_stats(gateway_name(),
emqx_types:clientid(),
emqx_types:stats()) -> boolean().
set_chan_stats(Type, ClientId, Stats) ->
set_chan_stats(Type, ClientId, self(), Stats).
set_chan_stats(GwName, ClientId, Stats) ->
set_chan_stats(GwName, ClientId, self(), Stats).
-spec set_chan_stats(gateway_type(),
-spec set_chan_stats(gateway_name(),
emqx_types:clientid(),
pid(),
emqx_types:stats()) -> boolean().
set_chan_stats(Type, ClientId, ChanPid, Stats) when node(ChanPid) == node() ->
set_chan_stats(GwName, ClientId, ChanPid, Stats) when node(ChanPid) == node() ->
Chan = {ClientId, self()},
try ets:update_element(tabname(info, Type), Chan, {3, Stats})
try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
catch
error:badarg -> false
end;
set_chan_stats(Type, ClientId, ChanPid, Stats) ->
rpc_call(node(ChanPid), set_chan_stats, [Type, ClientId, ChanPid, Stats]).
set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]).
-spec connection_closed(gateway_type(), emqx_types:clientid()) -> true.
connection_closed(Type, ClientId) ->
-spec connection_closed(gateway_name(), emqx_types:clientid()) -> true.
connection_closed(GwName, ClientId) ->
%% XXX: Why we need to delete conn_mod tab ???
Chan = {ClientId, self()},
ets:delete_object(tabname(conn, Type), Chan).
ets:delete_object(tabname(conn, GwName), Chan).
-spec open_session(Type :: atom(), CleanStart :: boolean(),
-spec open_session(GwName :: gateway_name(),
CleanStart :: boolean(),
ClientInfo :: emqx_types:clientinfo(),
ConnInfo :: emqx_types:conninfo(),
CreateSessionFun :: fun((emqx_types:clientinfo(),
@ -226,24 +231,24 @@ connection_closed(Type, ClientId) ->
}}
| {error, any()}.
open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session).
open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session).
open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
Self = self(),
ClientId = maps:get(clientid, ClientInfo),
Fun = fun(_) ->
ok = discard_session(Type, ClientId),
Session = create_session(Type,
ok = discard_session(GwName, ClientId),
Session = create_session(GwName,
ClientInfo,
ConnInfo,
CreateSessionFun,
SessionMod
),
register_channel(Type, ClientId, Self, ConnInfo),
register_channel(GwName, ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end,
locker_trans(Type, ClientId, Fun);
locker_trans(GwName, ClientId, Fun);
open_session(_Type, false = _CleanStart,
_ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) ->
@ -251,13 +256,13 @@ open_session(_Type, false = _CleanStart,
{error, not_supported_now}.
%% @private
create_session(Type, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
try
Session = emqx_gateway_utils:apply(
CreateSessionFun,
[ClientInfo, ConnInfo]
),
ok = emqx_gateway_metrics:inc(Type, 'session.created'),
ok = emqx_gateway_metrics:inc(GwName, 'session.created'),
SessionInfo = case is_tuple(Session)
andalso element(1, Session) == session of
true -> SessionMod:info(Session);
@ -279,17 +284,17 @@ create_session(Type, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
end.
%% @doc Discard all the sessions identified by the ClientId.
-spec discard_session(Type :: atom(), binary()) -> ok.
discard_session(Type, ClientId) when is_binary(ClientId) ->
case lookup_channels(Type, ClientId) of
-spec discard_session(GwName :: gateway_name(), binary()) -> ok.
discard_session(GwName, ClientId) when is_binary(ClientId) ->
case lookup_channels(GwName, ClientId) of
[] -> ok;
ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(Type, ClientId, Pid) end, ChanPids)
ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids)
end.
%% @private
do_discard_session(Type, ClientId, Pid) ->
do_discard_session(GwName, ClientId, Pid) ->
try
discard_session(Type, ClientId, Pid)
discard_session(GwName, ClientId, Pid)
catch
_ : noproc -> % emqx_ws_connection: call
%?tp(debug, "session_already_gone", #{pid => Pid}),
@ -307,72 +312,72 @@ do_discard_session(Type, ClientId, Pid) ->
end.
%% @private
discard_session(Type, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(Type, ClientId, ChanPid) of
discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> ok;
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end;
%% @private
discard_session(Type, ClientId, ChanPid) ->
rpc_call(node(ChanPid), discard_session, [Type, ClientId, ChanPid]).
discard_session(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]).
-spec kick_session(gateway_type(), emqx_types:clientid())
-spec kick_session(gateway_name(), emqx_types:clientid())
-> {error, any()}
| ok.
kick_session(Type, ClientId) ->
case lookup_channels(Type, ClientId) of
kick_session(GwName, ClientId) ->
case lookup_channels(GwName, ClientId) of
[] -> {error, not_found};
[ChanPid] ->
kick_session(Type, ClientId, ChanPid);
kick_session(GwName, ClientId, ChanPid);
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(Type, ClientId, StalePid)
catch discard_session(GwName, ClientId, StalePid)
end, StalePids),
kick_session(Type, ClientId, ChanPid)
kick_session(GwName, ClientId, ChanPid)
end.
kick_session(Type, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(Type, ClientId, ChanPid) of
kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(GwName, ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined ->
{error, not_found}
end;
kick_session(Type, ClientId, ChanPid) ->
rpc_call(node(ChanPid), kick_session, [Type, ClientId, ChanPid]).
kick_session(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]).
with_channel(Type, ClientId, Fun) ->
case lookup_channels(Type, ClientId) of
with_channel(GwName, ClientId, Fun) ->
case lookup_channels(GwName, ClientId) of
[] -> undefined;
[Pid] -> Fun(Pid);
Pids -> Fun(lists:last(Pids))
end.
%% @doc Lookup channels.
-spec(lookup_channels(atom(), emqx_types:clientid()) -> list(pid())).
lookup_channels(Type, ClientId) ->
emqx_gateway_cm_registry:lookup_channels(Type, ClientId).
-spec(lookup_channels(gateway_name(), emqx_types:clientid()) -> list(pid())).
lookup_channels(GwName, ClientId) ->
emqx_gateway_cm_registry:lookup_channels(GwName, ClientId).
get_chann_conn_mod(Type, ClientId, ChanPid) when node(ChanPid) == node() ->
get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try [ConnMod] = ets:lookup_element(tabname(conn, Type), Chan, 2), ConnMod
try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod
catch
error:badarg -> undefined
end;
get_chann_conn_mod(Type, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [Type, ClientId, ChanPid]).
get_chann_conn_mod(GwName, ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]).
%% Locker
locker_trans(_Type, undefined, Fun) ->
Fun([]);
locker_trans(Type, ClientId, Fun) ->
Locker = lockername(Type),
locker_trans(GwName, ClientId, Fun) ->
Locker = lockername(GwName),
case locker_lock(Locker, ClientId) of
{true, Nodes} ->
try Fun(Nodes) after locker_unlock(Locker, ClientId) end;
@ -401,27 +406,27 @@ cast(Name, Msg) ->
%%--------------------------------------------------------------------
init(Options) ->
Type = proplists:get_value(type, Options),
GwName = proplists:get_value(gwname, Options),
TabOpts = [public, {write_concurrency, true}],
{ChanTab, ConnTab, InfoTab} = cmtabs(Type),
{ChanTab, ConnTab, InfoTab} = cmtabs(GwName),
ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true}|TabOpts]),
ok = emqx_tables:new(ConnTab, [bag | TabOpts]),
ok = emqx_tables:new(InfoTab, [set, compressed | TabOpts]),
%% Start link cm-registry process
%% XXX: Should I hang it under a higher level supervisor?
{ok, Registry} = emqx_gateway_cm_registry:start_link(Type),
{ok, Registry} = emqx_gateway_cm_registry:start_link(GwName),
%% Start locker process
{ok, Locker} = ekka_locker:start_link(lockername(Type)),
{ok, Locker} = ekka_locker:start_link(lockername(GwName)),
%% Interval update stats
%% TODO: v0.2
%ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #state{type = Type,
{ok, #state{gwname = GwName,
locker = Locker,
registry = Registry,
chan_pmon = emqx_pmon:new()}}.
@ -438,12 +443,12 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{type = Type, chan_pmon = PMon}) ->
State = #state{gwname = GwName, chan_pmon = PMon}) ->
ChanPids = [Pid | emqx_misc:drain_down(?DEFAULT_BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
CmTabs = cmtabs(Type),
ok = emqx_pool:async_submit(fun do_unregister_channel_task/3, [Items, Type, CmTabs]),
CmTabs = cmtabs(GwName),
ok = emqx_pool:async_submit(fun do_unregister_channel_task/3, [Items, GwName, CmTabs]),
{noreply, State#state{chan_pmon = PMon1}};
handle_info(_Info, State) ->
@ -455,18 +460,18 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
do_unregister_channel_task(Items, Type, CmTabs) ->
do_unregister_channel_task(Items, GwName, CmTabs) ->
lists:foreach(
fun({ChanPid, ClientId}) ->
do_unregister_channel(Type, {ClientId, ChanPid}, CmTabs)
do_unregister_channel(GwName, {ClientId, ChanPid}, CmTabs)
end, Items).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
do_unregister_channel(Type, Chan, {ChanTab, ConnTab, InfoTab}) ->
ok = emqx_gateway_cm_registry:unregister_channel(Type, Chan),
do_unregister_channel(GwName, Chan, {ChanTab, ConnTab, InfoTab}) ->
ok = emqx_gateway_cm_registry:unregister_channel(GwName, Chan),
true = ets:delete(ConnTab, Chan),
true = ets:delete(InfoTab, Chan),
ets:delete_object(ChanTab, Chan).

View File

@ -27,10 +27,8 @@
%% configuration, register devices and other common operations.
%%
-type context() ::
#{ %% Gateway Instance ID
instid := instance_id()
%% Gateway ID
, type := gateway_type()
#{ %% Gateway Name
gwname := gateway_name()
%% Autenticator
, auth := emqx_authn:chain_id() | undefined
%% The ConnectionManager PID
@ -98,41 +96,43 @@ authenticate(_Ctx, ClientInfo) ->
}}
| {error, any()}.
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session).
open_session(Ctx, CleanStart, ClientInfo, ConnInfo,
CreateSessionFun, emqx_session).
open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
logger:warning("clean_start=false is not supported now, "
"fallback to clean_start mode"),
open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod);
open_session(_Ctx = #{type := Type},
open_session(_Ctx = #{gwname := GwName},
CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
emqx_gateway_cm:open_session(Type, CleanStart,
ClientInfo, ConnInfo, CreateSessionFun, SessionMod).
emqx_gateway_cm:open_session(GwName, CleanStart,
ClientInfo, ConnInfo,
CreateSessionFun, SessionMod).
-spec insert_channel_info(context(),
emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()) -> ok.
insert_channel_info(_Ctx = #{type := Type}, ClientId, Infos, Stats) ->
emqx_gateway_cm:insert_channel_info(Type, ClientId, Infos, Stats).
insert_channel_info(_Ctx = #{gwname := GwName}, ClientId, Infos, Stats) ->
emqx_gateway_cm:insert_channel_info(GwName, ClientId, Infos, Stats).
%% @doc Set the Channel Info to the ConnectionManager for this client
-spec set_chan_info(context(),
emqx_types:clientid(),
emqx_types:infos()) -> boolean().
set_chan_info(_Ctx = #{type := Type}, ClientId, Infos) ->
emqx_gateway_cm:set_chan_info(Type, ClientId, Infos).
set_chan_info(_Ctx = #{gwname := GwName}, ClientId, Infos) ->
emqx_gateway_cm:set_chan_info(GwName, ClientId, Infos).
-spec set_chan_stats(context(),
emqx_types:clientid(),
emqx_types:stats()) -> boolean().
set_chan_stats(_Ctx = #{type := Type}, ClientId, Stats) ->
emqx_gateway_cm:set_chan_stats(Type, ClientId, Stats).
set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) ->
emqx_gateway_cm:set_chan_stats(GwName, ClientId, Stats).
-spec connection_closed(context(), emqx_types:clientid()) -> boolean().
connection_closed(_Ctx = #{type := Type}, ClientId) ->
emqx_gateway_cm:connection_closed(Type, ClientId).
connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
emqx_gateway_cm:connection_closed(GwName, ClientId).
-spec authorize(context(), emqx_types:clientinfo(),
emqx_types:pubsub(), emqx_types:topic())
@ -140,11 +140,11 @@ connection_closed(_Ctx = #{type := Type}, ClientId) ->
authorize(_Ctx, ClientInfo, PubSub, Topic) ->
emqx_access_control:authorize(ClientInfo, PubSub, Topic).
metrics_inc(_Ctx = #{type := Type}, Name) ->
emqx_gateway_metrics:inc(Type, Name).
metrics_inc(_Ctx = #{gwname := GwName}, Name) ->
emqx_gateway_metrics:inc(GwName, Name).
metrics_inc(_Ctx = #{type := Type}, Name, Oct) ->
emqx_gateway_metrics:inc(Type, Name, Oct).
metrics_inc(_Ctx = #{gwname := GwName}, Name, Oct) ->
emqx_gateway_metrics:inc(GwName, Name, Oct).
%%--------------------------------------------------------------------
%% Internal funcs

View File

@ -42,18 +42,18 @@
%% APIs
%%--------------------------------------------------------------------
start_link(Type) ->
supervisor:start_link({local, Type}, ?MODULE, [Type]).
start_link(GwName) ->
supervisor:start_link({local, GwName}, ?MODULE, [GwName]).
-spec create_insta(pid(), gateway(), map()) -> {ok, GwInstaPid :: pid()} | {error, any()}.
create_insta(Sup, Gateway = #{type := GwType}, GwDscrptr) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
create_insta(Sup, Gateway = #{name := Name}, GwDscrptr) ->
case emqx_gateway_utils:find_sup_child(Sup, Name) of
{ok, _GwInstaPid} -> {error, alredy_existed};
false ->
Ctx = ctx(Sup, GwType),
Ctx = ctx(Sup, Name),
%%
ChildSpec = emqx_gateway_utils:childspec(
GwType,
Name,
worker,
emqx_gateway_insta_sup,
[Gateway, Ctx, GwDscrptr]
@ -63,34 +63,34 @@ create_insta(Sup, Gateway = #{type := GwType}, GwDscrptr) ->
)
end.
-spec remove_insta(pid(), GwType :: gateway_type()) -> ok | {error, any()}.
remove_insta(Sup, GwType) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
-spec remove_insta(pid(), Name :: gateway_name()) -> ok | {error, any()}.
remove_insta(Sup, Name) ->
case emqx_gateway_utils:find_sup_child(Sup, Name) of
false -> ok;
{ok, _GwInstaPid} ->
ok = supervisor:terminate_child(Sup, GwType),
ok = supervisor:delete_child(Sup, GwType)
ok = supervisor:terminate_child(Sup, Name),
ok = supervisor:delete_child(Sup, Name)
end.
-spec update_insta(pid(), NewGateway :: gateway()) -> ok | {error, any()}.
update_insta(Sup, NewGateway = #{type := GwType}) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
update_insta(Sup, NewGateway = #{name := Name}) ->
case emqx_gateway_utils:find_sup_child(Sup, Name) of
false -> {error, not_found};
{ok, GwInstaPid} ->
emqx_gateway_insta_sup:update(GwInstaPid, NewGateway)
end.
-spec start_insta(pid(), gateway_type()) -> ok | {error, any()}.
start_insta(Sup, GwType) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
-spec start_insta(pid(), gateway_name()) -> ok | {error, any()}.
start_insta(Sup, Name) ->
case emqx_gateway_utils:find_sup_child(Sup, Name) of
false -> {error, not_found};
{ok, GwInstaPid} ->
emqx_gateway_insta_sup:enable(GwInstaPid)
end.
-spec stop_insta(pid(), gateway_type()) -> ok | {error, any()}.
stop_insta(Sup, GwType) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
-spec stop_insta(pid(), gateway_name()) -> ok | {error, any()}.
stop_insta(Sup, Name) ->
case emqx_gateway_utils:find_sup_child(Sup, Name) of
false -> {error, not_found};
{ok, GwInstaPid} ->
emqx_gateway_insta_sup:disable(GwInstaPid)
@ -99,33 +99,31 @@ stop_insta(Sup, GwType) ->
-spec list_insta(pid()) -> [gateway()].
list_insta(Sup) ->
lists:filtermap(
fun({GwType, GwInstaPid, _Type, _Mods}) ->
is_gateway_insta_id(GwType)
fun({Name, GwInstaPid, _Type, _Mods}) ->
is_gateway_insta_id(Name)
andalso {true, emqx_gateway_insta_sup:info(GwInstaPid)}
end, supervisor:which_children(Sup)).
%% Supervisor callback
%% @doc Initialize Top Supervisor for a Protocol
init([Type]) ->
init([GwName]) ->
SupFlags = #{ strategy => one_for_one
, intensity => 10
, period => 60
},
CmOpts = [{type, Type}],
CmOpts = [{gwname, GwName}],
CM = emqx_gateway_utils:childspec(worker, emqx_gateway_cm, [CmOpts]),
Metrics = emqx_gateway_utils:childspec(worker, emqx_gateway_metrics, [Type]),
Metrics = emqx_gateway_utils:childspec(worker, emqx_gateway_metrics, [GwName]),
{ok, {SupFlags, [CM, Metrics]}}.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
ctx(Sup, GwType) ->
{_, Type} = erlang:process_info(Sup, registered_name),
ctx(Sup, Name) ->
{ok, CM} = emqx_gateway_utils:find_sup_child(Sup, emqx_gateway_cm),
#{ instid => GwType
, type => Type
#{ gwname => Name
, cm => CM
}.

View File

@ -86,8 +86,8 @@ call(Pid, Req) ->
init([Gateway, Ctx0, _GwDscrptr]) ->
process_flag(trap_exit, true),
#{type := GwType, rawconf := RawConf} = Gateway,
Ctx = do_init_context(GwType, RawConf, Ctx0),
#{name := GwName, rawconf := RawConf} = Gateway,
Ctx = do_init_context(GwName, RawConf, Ctx0),
State = #state{
gw = Gateway,
ctx = Ctx,
@ -102,11 +102,11 @@ init([Gateway, Ctx0, _GwDscrptr]) ->
{ok, NState}
end.
do_init_context(GwType, RawConf, Ctx) ->
do_init_context(GwName, RawConf, Ctx) ->
Auth = case maps:get(authentication, RawConf, #{enable => false}) of
#{enable := true,
authenticators := AuthCfgs} when is_list(AuthCfgs) ->
create_authenticators_for_gateway_insta(GwType, AuthCfgs);
create_authenticators_for_gateway_insta(GwName, AuthCfgs);
_ ->
undefined
end,
@ -116,8 +116,8 @@ do_deinit_context(Ctx) ->
cleanup_authenticators_for_gateway_insta(maps:get(auth, Ctx)),
ok.
handle_call(info, _From, State = #state{gw = Gateway}) ->
{reply, Gateway, State};
handle_call(info, _From, State = #state{gw = Gateway, status = Status}) ->
{reply, Gateway#{status => Status}, State};
handle_call(disable, _From, State = #state{status = Status}) ->
case Status of
@ -146,21 +146,22 @@ handle_call(enable, _From, State = #state{status = Status}) ->
end;
%% Stopped -> update
handle_call({update, NewGateway}, _From, State = #state{gw = Gateway,
status = stopped}) ->
case maps:get(type, NewGateway, undefined)
== maps:get(type, Gateway, undefined) of
handle_call({update, NewGateway}, _From, State = #state{
gw = Gateway,
status = stopped}) ->
case maps:get(name, NewGateway, undefined)
== maps:get(name, Gateway, undefined) of
true ->
{reply, ok, State#state{gw = NewGateway}};
false ->
{reply, {error, gateway_type_not_match}, State}
{reply, {error, gateway_name_not_match}, State}
end;
%% Running -> update
handle_call({update, NewGateway}, _From, State = #state{gw = Gateway,
status = running}) ->
case maps:get(type, NewGateway, undefined)
== maps:get(type, Gateway, undefined) of
case maps:get(name, NewGateway, undefined)
== maps:get(name, Gateway, undefined) of
true ->
case cb_gateway_update(NewGateway, State) of
{ok, NState} ->
@ -169,7 +170,7 @@ handle_call({update, NewGateway}, _From, State = #state{gw = Gateway,
{reply, {error, Reason}, State}
end;
false ->
{reply, {error, gateway_type_not_match}, State}
{reply, {error, gateway_name_not_match}, State}
end;
handle_call(_Request, _From, State) ->
@ -215,8 +216,8 @@ code_change(_OldVsn, State, _Extra) ->
%% @doc AuthCfgs is a array of authenticatior configurations,
%% see: emqx_authn_schema:authenticators/1
create_authenticators_for_gateway_insta(GwType, AuthCfgs) ->
ChainId = atom_to_binary(GwType, utf8),
create_authenticators_for_gateway_insta(GwName, AuthCfgs) ->
ChainId = atom_to_binary(GwName, utf8),
case emqx_authn:create_chain(#{id => ChainId}) of
{ok, _ChainInfo} ->
Results = lists:map(fun(AuthCfg = #{name := Name}) ->
@ -250,10 +251,10 @@ cleanup_authenticators_for_gateway_insta(ChainId) ->
"reason: ~p", [ChainId, Reason])
end.
cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType},
cb_gateway_unload(State = #state{gw = Gateway = #{name := GwName},
gw_state = GwState}) ->
try
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType),
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
CbMod:on_gateway_unload(Gateway, GwState),
{ok, State#state{child_pids = [],
gw_state = undefined,
@ -267,10 +268,10 @@ cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType},
{error, {Class, Reason, Stk}}
end.
cb_gateway_load(State = #state{gw = Gateway = #{type := GwType},
cb_gateway_load(State = #state{gw = Gateway = #{name := GwName},
ctx = Ctx}) ->
try
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType),
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_load(Gateway, Ctx) of
{error, Reason} -> throw({callback_return_error, Reason});
{ok, ChildPidOrSpecs, GwState} ->
@ -285,17 +286,17 @@ cb_gateway_load(State = #state{gw = Gateway = #{type := GwType},
Class : Reason1 : Stk ->
logger:error("Failed to load ~s gateway (~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[GwType, Gateway, Ctx,
[GwName, Gateway, Ctx,
Class, Reason1, Stk]),
{error, {Class, Reason1, Stk}}
end.
cb_gateway_update(NewGateway,
State = #state{gw = Gateway = #{type := GwType},
State = #state{gw = Gateway = #{name := GwName},
ctx = Ctx,
gw_state = GwState}) ->
try
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType),
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_update(NewGateway, Gateway, GwState) of
{error, Reason} -> throw({callback_return_error, Reason});
{ok, ChildPidOrSpecs, NGwState} ->

View File

@ -47,36 +47,36 @@
%% APIs
%%--------------------------------------------------------------------
start_link(Type) ->
gen_server:start_link(?MODULE, [Type], []).
start_link(GwName) ->
gen_server:start_link(?MODULE, [GwName], []).
-spec inc(gateway_type(), atom()) -> ok.
inc(Type, Name) ->
inc(Type, Name, 1).
-spec inc(gateway_name(), atom()) -> ok.
inc(GwName, Name) ->
inc(GwName, Name, 1).
-spec inc(gateway_type(), atom(), integer()) -> ok.
inc(Type, Name, Oct) ->
ets:update_counter(tabname(Type), Name, {2, Oct}, {Name, 0}),
-spec inc(gateway_name(), atom(), integer()) -> ok.
inc(GwName, Name, Oct) ->
ets:update_counter(tabname(GwName), Name, {2, Oct}, {Name, 0}),
ok.
-spec dec(gateway_type(), atom()) -> ok.
dec(Type, Name) ->
inc(Type, Name, -1).
-spec dec(gateway_name(), atom()) -> ok.
dec(GwName, Name) ->
inc(GwName, Name, -1).
-spec dec(gateway_type(), atom(), non_neg_integer()) -> ok.
dec(Type, Name, Oct) ->
inc(Type, Name, -Oct).
-spec dec(gateway_name(), atom(), non_neg_integer()) -> ok.
dec(GwName, Name, Oct) ->
inc(GwName, Name, -Oct).
tabname(Type) ->
list_to_atom(lists:concat([emqx_gateway_, Type, '_metrics'])).
tabname(GwName) ->
list_to_atom(lists:concat([emqx_gateway_, GwName, '_metrics'])).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Type]) ->
init([GwName]) ->
TabOpts = [public, {write_concurrency, true}],
ok = emqx_tables:new(tabname(Type), [set|TabOpts]),
ok = emqx_tables:new(tabname(GwName), [set|TabOpts]),
{ok, #state{}}.
handle_call(_Request, _From, State) ->

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The Registry Centre of Gateway Type
%% @doc The Registry Centre of Gateway
-module(emqx_gateway_registry).
-include("include/emqx_gateway.hrl").
@ -42,7 +42,7 @@
]).
-record(state, {
reged = #{} :: #{ gateway_type() => descriptor() }
reged = #{} :: #{ gateway_name() => descriptor() }
}).
-type registry_options() :: [registry_option()].
@ -64,33 +64,33 @@ start_link() ->
%% Mgmt
%%--------------------------------------------------------------------
-spec reg(gateway_type(), registry_options())
-spec reg(gateway_name(), registry_options())
-> ok
| {error, any()}.
reg(Type, RgOpts) ->
CbMod = proplists:get_value(cbkmod, RgOpts, Type),
reg(Name, RgOpts) ->
CbMod = proplists:get_value(cbkmod, RgOpts, Name),
Dscrptr = #{ cbkmod => CbMod
, rgopts => RgOpts
},
call({reg, Type, Dscrptr}).
call({reg, Name, Dscrptr}).
-spec unreg(gateway_type()) -> ok | {error, any()}.
unreg(Type) ->
-spec unreg(gateway_name()) -> ok | {error, any()}.
unreg(Name) ->
%% TODO: Checking ALL INSTACE HAS STOPPED
call({unreg, Type}).
call({unreg, Name}).
%% TODO:
%unreg(Type, Force) ->
% call({unreg, Type, Froce}).
%unreg(Name, Force) ->
% call({unreg, Name, Froce}).
%% @doc Return all registered protocol gateway implementation
-spec list() -> [{gateway_type(), descriptor()}].
-spec list() -> [{gateway_name(), descriptor()}].
list() ->
call(all).
-spec lookup(gateway_type()) -> undefined | descriptor().
lookup(Type) ->
call({lookup, Type}).
-spec lookup(gateway_name()) -> undefined | descriptor().
lookup(Name) ->
call({lookup, Name}).
call(Req) ->
gen_server:call(?MODULE, Req, 5000).
@ -104,29 +104,29 @@ init([]) ->
process_flag(trap_exit, true),
{ok, #state{reged = #{}}}.
handle_call({reg, Type, Dscrptr}, _From, State = #state{reged = Gateways}) ->
case maps:get(Type, Gateways, notfound) of
handle_call({reg, Name, Dscrptr}, _From, State = #state{reged = Gateways}) ->
case maps:get(Name, Gateways, notfound) of
notfound ->
NGateways = maps:put(Type, Dscrptr, Gateways),
NGateways = maps:put(Name, Dscrptr, Gateways),
{reply, ok, State#state{reged = NGateways}};
_ ->
{reply, {error, already_existed}, State}
end;
handle_call({unreg, Type}, _From, State = #state{reged = Gateways}) ->
case maps:get(Type, Gateways, undefined) of
handle_call({unreg, Name}, _From, State = #state{reged = Gateways}) ->
case maps:get(Name, Gateways, undefined) of
undefined ->
{reply, ok, State};
_ ->
_ = emqx_gateway_sup:unload_gateway(Type),
{reply, ok, State#state{reged = maps:remove(Type, Gateways)}}
_ = emqx_gateway_sup:unload_gateway(Name),
{reply, ok, State#state{reged = maps:remove(Name, Gateways)}}
end;
handle_call(all, _From, State = #state{reged = Gateways}) ->
{reply, maps:to_list(Gateways), State};
handle_call({lookup, Type}, _From, State = #state{reged = Gateways}) ->
Reply = maps:get(Type, Gateways, undefined),
handle_call({lookup, Name}, _From, State = #state{reged = Gateways}) ->
Reply = maps:get(Name, Gateways, undefined),
{reply, Reply, State};
handle_call(Req, _From, State) ->

View File

@ -44,27 +44,27 @@ start_link() ->
-spec load_gateway(gateway()) -> {ok, pid()} | {error, any()}.
load_gateway(Gateway = #{type := GwType}) ->
case emqx_gateway_registry:lookup(GwType) of
undefined -> {error, {unknown_gateway_type, GwType}};
load_gateway(Gateway = #{name := GwName}) ->
case emqx_gateway_registry:lookup(GwName) of
undefined -> {error, {unknown_gateway_name, GwName}};
GwDscrptr ->
{ok, GwSup} = ensure_gateway_suptree_ready(GwType),
{ok, GwSup} = ensure_gateway_suptree_ready(GwName),
emqx_gateway_gw_sup:create_insta(GwSup, Gateway, GwDscrptr)
end.
-spec unload_gateway(gateway_type()) -> ok | {error, not_found}.
unload_gateway(GwType) ->
case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of
-spec unload_gateway(gateway_name()) -> ok | {error, not_found}.
unload_gateway(GwName) ->
case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of
false -> {error, not_found};
_ ->
_ = supervisor:terminate_child(?MODULE, GwType),
_ = supervisor:delete_child(?MODULE, GwType),
_ = supervisor:terminate_child(?MODULE, GwName),
_ = supervisor:delete_child(?MODULE, GwName),
ok
end.
-spec lookup_gateway(gateway_type()) -> gateway() | undefined.
lookup_gateway(GwType) ->
case search_gateway_insta_proc(GwType) of
-spec lookup_gateway(gateway_name()) -> gateway() | undefined.
lookup_gateway(GwName) ->
case search_gateway_insta_proc(GwName) of
{ok, {_, GwInstaPid}} ->
emqx_gateway_insta_sup:info(GwInstaPid);
_ ->
@ -74,25 +74,25 @@ lookup_gateway(GwType) ->
-spec update_gateway(gateway())
-> ok
| {error, any()}.
update_gateway(NewGateway = #{type := GwType}) ->
case emqx_gateway_utils:find_sup_child(?MODULE, GwType) of
update_gateway(NewGateway = #{name := GwName}) ->
case emqx_gateway_utils:find_sup_child(?MODULE, GwName) of
{ok, GwSup} ->
emqx_gateway_gw_sup:update_insta(GwSup, NewGateway);
_ -> {error, not_found}
end.
start_gateway_insta(GwType) ->
case search_gateway_insta_proc(GwType) of
start_gateway_insta(GwName) ->
case search_gateway_insta_proc(GwName) of
{ok, {GwSup, _}} ->
emqx_gateway_gw_sup:start_insta(GwSup, GwType);
emqx_gateway_gw_sup:start_insta(GwSup, GwName);
_ -> {error, not_found}
end.
-spec stop_gateway_insta(gateway_type()) -> ok | {error, any()}.
stop_gateway_insta(GwType) ->
case search_gateway_insta_proc(GwType) of
-spec stop_gateway_insta(gateway_name()) -> ok | {error, any()}.
stop_gateway_insta(GwName) ->
case search_gateway_insta_proc(GwName) of
{ok, {GwSup, _}} ->
emqx_gateway_gw_sup:stop_insta(GwSup, GwType);
emqx_gateway_gw_sup:stop_insta(GwSup, GwName);
_ -> {error, not_found}
end.
@ -103,9 +103,9 @@ list_gateway_insta() ->
emqx_gateway_gw_sup:list_insta(SupId)
end, list_started_gateway())).
-spec list_started_gateway() -> [gateway_type()].
-spec list_started_gateway() -> [gateway_name()].
list_started_gateway() ->
started_gateway_type().
started_gateway().
%% Supervisor callback
@ -122,14 +122,14 @@ init([]) ->
%% Internal funcs
%%--------------------------------------------------------------------
ensure_gateway_suptree_ready(GwType) ->
case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of
ensure_gateway_suptree_ready(GwName) ->
case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of
false ->
ChildSpec = emqx_gateway_utils:childspec(
GwType,
GwName,
supervisor,
emqx_gateway_gw_sup,
[GwType]
[GwName]
),
emqx_gateway_utils:supervisor_ret(
supervisor:start_child(?MODULE, ChildSpec)
@ -150,7 +150,7 @@ search_gateway_insta_proc(InstaId, [SupPid|More]) ->
search_gateway_insta_proc(InstaId, More)
end.
started_gateway_type() ->
started_gateway() ->
lists:filtermap(
fun({Id, _, _, _}) ->
is_a_gateway_id(Id) andalso {true, Id}

View File

@ -47,9 +47,9 @@ unreg() ->
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
start_grpc_server(_GwType, undefined) ->
start_grpc_server(_GwName, undefined) ->
undefined;
start_grpc_server(GwType, Options = #{bind := ListenOn}) ->
start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
Services = #{protos => [emqx_exproto_pb],
services => #{
'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}
@ -59,12 +59,12 @@ start_grpc_server(GwType, Options = #{bind := ListenOn}) ->
SslOpts ->
[{ssl_options, SslOpts}]
end,
_ = grpc:start_server(GwType, ListenOn, Services, SvrOptions),
?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwType, ListenOn]).
_ = grpc:start_server(GwName, ListenOn, Services, SvrOptions),
?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwName, ListenOn]).
start_grpc_client_channel(_GwType, undefined) ->
undefined;
start_grpc_client_channel(GwType, Options = #{address := UriStr}) ->
start_grpc_client_channel(GwName, Options = #{address := UriStr}) ->
UriMap = uri_string:parse(UriStr),
Scheme = maps:get(scheme, UriMap),
Host = maps:get(host, UriMap),
@ -81,36 +81,36 @@ start_grpc_client_channel(GwType, Options = #{address := UriStr}) ->
transport_opts => SslOpts}};
_ -> #{}
end,
grpc_client_sup:create_channel_pool(GwType, SvrAddr, ClientOpts).
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts).
on_gateway_load(_Gateway = #{ type := GwType,
on_gateway_load(_Gateway = #{ name := GwName,
rawconf := RawConf
}, Ctx) ->
%% XXX: How to monitor it ?
%% Start grpc client pool & client channel
PoolName = pool_name(GwType),
PoolName = pool_name(GwName),
PoolSize = emqx_vm:schedulers() * 2,
{ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize,
{emqx_exproto_gcli, start_link, []}),
_ = start_grpc_client_channel(GwType, maps:get(handler, RawConf, undefined)),
_ = start_grpc_client_channel(GwName, maps:get(handler, RawConf, undefined)),
%% XXX: How to monitor it ?
_ = start_grpc_server(GwType, maps:get(server, RawConf, undefined)),
_ = start_grpc_server(GwName, maps:get(server, RawConf, undefined)),
NRawConf = maps:without(
[server, handler],
RawConf#{pool_name => PoolName}
),
Listeners = emqx_gateway_utils:normalize_rawconf(
NRawConf#{handler => GwType}
NRawConf#{handler => GwName}
),
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwType, Ctx, Lis)
start_listener(GwName, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
GwName = maps:get(name, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
@ -120,40 +120,40 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
Class : Reason : Stk ->
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwType, Class, Reason, Stk]),
[GwName, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_gateway_unload(_Gateway = #{ type := GwType,
on_gateway_unload(_Gateway = #{ name := GwName,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(GwType, Lis)
stop_listener(GwName, Lis)
end, Listeners).
pool_name(GwType) ->
list_to_atom(lists:concat([GwType, "_gcli_pool"])).
pool_name(GwName) ->
list_to_atom(lists:concat([GwName, "_gcli_pool"])).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
[GwName, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
[GwName, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_exproto_frame,
@ -172,8 +172,8 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) ->
do_start_listener(dtls, Name, ListenOn, Opts, MFA) ->
esockd:open_dtls(Name, ListenOn, Opts, MFA).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
name(GwName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])).
merge_default_by_type(Type, Options) when Type =:= tcp;
Type =:= ssl ->
@ -196,18 +196,18 @@ merge_default_by_type(Type, Options) when Type =:= udp;
[{udp_options, Default} | Options]
end.
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
[GwName, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
[GwName, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type),
esockd:close(Name, ListenOn).

View File

@ -47,7 +47,7 @@ unreg() ->
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_gateway_load(_Gateway = #{ type := GwType,
on_gateway_load(_Gateway = #{ name := GwName,
rawconf := RawConf
}, Ctx) ->
@ -66,12 +66,12 @@ on_gateway_load(_Gateway = #{ type := GwType,
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwType, Ctx, Lis)
start_listener(GwName, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
GwName = maps:get(name, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
@ -81,11 +81,11 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
Class : Reason : Stk ->
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwType, Class, Reason, Stk]),
[GwName, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_gateway_unload(_Gateway = #{ type := GwType,
on_gateway_unload(_Gateway = #{ name := GwName,
rawconf := RawConf
}, _GwState) ->
%% XXX:
@ -96,28 +96,28 @@ on_gateway_unload(_Gateway = #{ type := GwType,
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(GwType, Lis)
stop_listener(GwName, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
[GwName, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
[GwName, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, udp),
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, udp),
NCfg = Cfg#{ctx => Ctx},
NSocketOpts = merge_default(SocketOpts),
Options = [{config, NCfg}|NSocketOpts],
@ -128,8 +128,8 @@ start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
lwm2m_coap_server:start_dtls(Name, ListenOn, Options)
end.
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
name(GwName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])).
merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(),
@ -141,20 +141,20 @@ merge_default(Options) ->
[{udp_options, Default} | Options]
end.
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
[GwName, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
[GwName, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type),
case Type of
udp ->
lwm2m_coap_server:stop_udp(Name, ListenOn);

View File

@ -47,7 +47,7 @@ unreg() ->
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_gateway_load(_Gateway = #{ type := GwType,
on_gateway_load(_Gateway = #{ name := GwName,
rawconf := RawConf
}, Ctx) ->
@ -64,7 +64,7 @@ on_gateway_load(_Gateway = #{ type := GwType,
end,
PredefTopics = maps:get(predefined, RawConf, []),
{ok, RegistrySvr} = emqx_sn_registry:start_link(GwType, PredefTopics),
{ok, RegistrySvr} = emqx_sn_registry:start_link(GwName, PredefTopics),
NRawConf = maps:without(
[broadcast, predefined],
@ -73,11 +73,11 @@ on_gateway_load(_Gateway = #{ type := GwType,
Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwType, Ctx, Lis)
start_listener(GwName, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
on_gateway_update(NewGateway = #{type := GwType}, OldGateway,
on_gateway_update(NewGateway = #{name := GwName}, OldGateway,
GwState = #{ctx := Ctx}) ->
try
%% XXX: 1. How hot-upgrade the changes ???
@ -88,37 +88,37 @@ on_gateway_update(NewGateway = #{type := GwType}, OldGateway,
Class : Reason : Stk ->
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwType, Class, Reason, Stk]),
[GwName, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_gateway_unload(_Insta = #{ type := GwType,
on_gateway_unload(_Insta = #{ name := GwName,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(GwType, Lis)
stop_listener(GwName, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
[GwName, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
[GwName, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_sn_frame,
@ -127,8 +127,8 @@ start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
name(GwName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])).
merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(),
@ -140,18 +140,18 @@ merge_default(Options) ->
[{udp_options, Default} | Options]
end.
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
[GwName, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
[GwName, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type),
esockd:close(Name, ListenOn).

View File

@ -49,21 +49,21 @@ unreg() ->
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_gateway_load(_Gateway = #{ type := GwType,
on_gateway_load(_Gateway = #{ name := GwName,
rawconf := RawConf
}, Ctx) ->
%% Step1. Fold the rawconfs to listeners
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
%% Step2. Start listeners or escokd:specs
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwType, Ctx, Lis)
start_listener(GwName, Ctx, Lis)
end, Listeners),
%% FIXME: How to throw an exception to interrupt the restart logic ?
%% FIXME: Assign ctx to GwState
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
GwName = maps:get(name, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old state???
@ -73,37 +73,37 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
Class : Reason : Stk ->
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwType, Class, Reason, Stk]),
[GwName, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_gateway_unload(_Gateway = #{ type := GwType,
on_gateway_unload(_Gateway = #{ name := GwName,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(GwType, Lis)
stop_listener(GwName, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
[GwName, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
[GwName, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_stomp_frame,
@ -112,8 +112,8 @@ start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
esockd:open(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
name(GwName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])).
merge_default(Options) ->
Default = emqx_gateway_utils:default_tcp_options(),
@ -125,18 +125,18 @@ merge_default(Options) ->
[{tcp_options, Default} | Options]
end.
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
[GwName, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
[GwName, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type),
esockd:close(Name, ListenOn).