chore(gw): rename rawconf to config
This commit is contained in:
parent
9c855ba8c2
commit
86e28d5abb
|
@ -21,14 +21,6 @@
|
|||
|
||||
-type listener() :: #{}.
|
||||
|
||||
%% The RawConf got from emqx:get_config/1
|
||||
-type rawconf() ::
|
||||
#{ clientinfo_override => map()
|
||||
, authenticator => map()
|
||||
, listeners => listener()
|
||||
, atom() => any()
|
||||
}.
|
||||
|
||||
%% @doc The Gateway defination
|
||||
-type gateway() ::
|
||||
#{ name := gateway_name()
|
||||
|
@ -40,7 +32,7 @@
|
|||
%% Timestamp in millisecond
|
||||
, started_at => integer()
|
||||
%% Appears only in getting gateway info
|
||||
, rawconf => rawconf()
|
||||
, config => emqx_config:config()
|
||||
}.
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -49,9 +49,9 @@ unreg() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
on_gateway_load(_Gateway = #{name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, Ctx) ->
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
ListenerPids = lists:map(fun(Lis) ->
|
||||
start_listener(GwName, Ctx, Lis)
|
||||
end, Listeners),
|
||||
|
@ -74,9 +74,9 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
|
|||
end.
|
||||
|
||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, _GwState) ->
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
lists:foreach(fun(Lis) ->
|
||||
stop_listener(GwName, Lis)
|
||||
end, Listeners).
|
||||
|
|
|
@ -29,6 +29,8 @@
|
|||
, list/0
|
||||
]).
|
||||
|
||||
-export([update_rawconf/2]).
|
||||
|
||||
-spec registered_gateway() ->
|
||||
[{gateway_name(), emqx_gateway_registry:descriptor()}].
|
||||
registered_gateway() ->
|
||||
|
@ -41,13 +43,13 @@ registered_gateway() ->
|
|||
list() ->
|
||||
emqx_gateway_sup:list_gateway_insta().
|
||||
|
||||
-spec load(gateway_name(), rawconf())
|
||||
-spec load(gateway_name(), emqx_config:config())
|
||||
-> {ok, pid()}
|
||||
| {error, any()}.
|
||||
load(Name, RawConf) ->
|
||||
load(Name, Config) ->
|
||||
Gateway = #{ name => Name
|
||||
, descr => undefined
|
||||
, rawconf => RawConf
|
||||
, config => Config
|
||||
},
|
||||
emqx_gateway_sup:load_gateway(Gateway).
|
||||
|
||||
|
@ -59,9 +61,9 @@ unload(Name) ->
|
|||
lookup(Name) ->
|
||||
emqx_gateway_sup:lookup_gateway(Name).
|
||||
|
||||
-spec update(gateway_name(), rawconf()) -> ok | {error, any()}.
|
||||
update(Name, RawConf) ->
|
||||
NewGateway = #{name => Name, rawconf => RawConf},
|
||||
-spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}.
|
||||
update(Name, Config) ->
|
||||
NewGateway = #{name => Name, config => Config},
|
||||
emqx_gateway_sup:update_gateway(NewGateway).
|
||||
|
||||
-spec start(gateway_name()) -> ok | {error, any()}.
|
||||
|
@ -72,6 +74,13 @@ start(Name) ->
|
|||
stop(Name) ->
|
||||
emqx_gateway_sup:stop_gateway_insta(Name).
|
||||
|
||||
-spec update_rawconf(gateway_name(), emqx_config:raw_config())
|
||||
-> ok
|
||||
| {error, any()}.
|
||||
update_rawconf(_Name, _RawConf) ->
|
||||
%% TODO:
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -356,8 +356,9 @@ gateway_insta(delete, Request) ->
|
|||
gateway_insta(get, Request) ->
|
||||
Name = binary_to_existing_atom(cowboy_req:binding(name, Request)),
|
||||
case emqx_gateway:lookup(Name) of
|
||||
#{rawconf := RawConf} ->
|
||||
{200, RawConf};
|
||||
#{config := Config} ->
|
||||
%% TODO: ??? RawConf or Config ??
|
||||
{200, Config};
|
||||
undefined ->
|
||||
{404, <<"Not Found">>}
|
||||
end;
|
||||
|
|
|
@ -88,8 +88,8 @@ call(Pid, Req) ->
|
|||
|
||||
init([Gateway, Ctx0, _GwDscrptr]) ->
|
||||
process_flag(trap_exit, true),
|
||||
#{name := GwName, rawconf := RawConf} = Gateway,
|
||||
Ctx = do_init_context(GwName, RawConf, Ctx0),
|
||||
#{name := GwName, config := Config } = Gateway,
|
||||
Ctx = do_init_context(GwName, Config, Ctx0),
|
||||
State = #state{
|
||||
gw = Gateway,
|
||||
ctx = Ctx,
|
||||
|
@ -105,8 +105,8 @@ init([Gateway, Ctx0, _GwDscrptr]) ->
|
|||
{ok, NState}
|
||||
end.
|
||||
|
||||
do_init_context(GwName, RawConf, Ctx) ->
|
||||
Auth = case maps:get(authentication, RawConf, #{enable => false}) of
|
||||
do_init_context(GwName, Config, Ctx) ->
|
||||
Auth = case maps:get(authentication, Config, #{enable => false}) of
|
||||
#{enable := false} -> undefined;
|
||||
AuthCfg when is_map(AuthCfg) ->
|
||||
case maps:get(enable, AuthCfg, true) of
|
||||
|
|
|
@ -39,7 +39,7 @@ gateways(Status) ->
|
|||
Gateways = lists:map(fun({GwName, _}) ->
|
||||
case emqx_gateway:lookup(GwName) of
|
||||
undefined -> #{name => GwName, status => unloaded};
|
||||
GwInfo = #{rawconf := RawConf} ->
|
||||
GwInfo = #{config := Config} ->
|
||||
GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
|
||||
[created_at, started_at, stopped_at],
|
||||
GwInfo),
|
||||
|
@ -48,7 +48,7 @@ gateways(Status) ->
|
|||
created_at,
|
||||
started_at,
|
||||
stopped_at], GwInfo0),
|
||||
GwInfo1#{listeners => get_listeners_status(GwName, RawConf)}
|
||||
GwInfo1#{listeners => get_listeners_status(GwName, Config)}
|
||||
|
||||
end
|
||||
end, emqx_gateway_registry:list()),
|
||||
|
@ -59,8 +59,8 @@ gateways(Status) ->
|
|||
end.
|
||||
|
||||
%% @private
|
||||
get_listeners_status(GwName, RawConf) ->
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
get_listeners_status(GwName, Config) ->
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
lists:map(fun({Type, LisName, ListenOn, _, _}) ->
|
||||
Name0 = listener_name(GwName, Type, LisName),
|
||||
Name = {Name0, ListenOn},
|
||||
|
|
|
@ -158,7 +158,8 @@ fields(dtls_listener) ->
|
|||
[ {"$name", t(ref(dtls_listener_settings))}];
|
||||
|
||||
fields(listener_settings) ->
|
||||
[ {bind, t(union(ip_port(), integer()))}
|
||||
[ {enable, t(boolean(), undefined, true)}
|
||||
, {bind, t(union(ip_port(), integer()))}
|
||||
, {acceptors, t(integer(), undefined, 8)}
|
||||
, {max_connections, t(integer(), undefined, 1024)}
|
||||
, {max_conn_rate, t(integer())}
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
, unix_ts_to_rfc3339/2
|
||||
]).
|
||||
|
||||
-export([ normalize_rawconf/1
|
||||
-export([ normalize_config/1
|
||||
]).
|
||||
|
||||
%% Common Envs
|
||||
|
@ -118,14 +118,14 @@ unix_ts_to_rfc3339(Key, Map) ->
|
|||
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)}
|
||||
end.
|
||||
|
||||
-spec normalize_rawconf(rawconf())
|
||||
-spec normalize_config(emqx_config:config())
|
||||
-> list({ Type :: udp | tcp | ssl | dtls
|
||||
, Name :: atom()
|
||||
, ListenOn :: esockd:listen_on()
|
||||
, SocketOpts :: esockd:option()
|
||||
, Cfg :: map()
|
||||
}).
|
||||
normalize_rawconf(RawConf) ->
|
||||
normalize_config(RawConf) ->
|
||||
LisMap = maps:get(listeners, RawConf, #{}),
|
||||
Cfg0 = maps:without([listeners], RawConf),
|
||||
lists:append(maps:fold(fun(Type, Liss, AccIn1) ->
|
||||
|
|
|
@ -84,7 +84,7 @@ start_grpc_client_channel(GwName, Options = #{address := UriStr}) ->
|
|||
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts).
|
||||
|
||||
on_gateway_load(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, Ctx) ->
|
||||
%% XXX: How to monitor it ?
|
||||
%% Start grpc client pool & client channel
|
||||
|
@ -92,17 +92,17 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
|||
PoolSize = emqx_vm:schedulers() * 2,
|
||||
{ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize,
|
||||
{emqx_exproto_gcli, start_link, []}),
|
||||
_ = start_grpc_client_channel(GwName, maps:get(handler, RawConf, undefined)),
|
||||
_ = start_grpc_client_channel(GwName, maps:get(handler, Config, undefined)),
|
||||
|
||||
%% XXX: How to monitor it ?
|
||||
_ = start_grpc_server(GwName, maps:get(server, RawConf, undefined)),
|
||||
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
|
||||
|
||||
NRawConf = maps:without(
|
||||
NConfig = maps:without(
|
||||
[server, handler],
|
||||
RawConf#{pool_name => PoolName}
|
||||
Config#{pool_name => PoolName}
|
||||
),
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(
|
||||
NRawConf#{handler => GwName}
|
||||
Listeners = emqx_gateway_utils:normalize_config(
|
||||
NConfig#{handler => GwName}
|
||||
),
|
||||
ListenerPids = lists:map(fun(Lis) ->
|
||||
start_listener(GwName, Ctx, Lis)
|
||||
|
@ -125,9 +125,9 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
|
|||
end.
|
||||
|
||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, _GwState) ->
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
lists:foreach(fun(Lis) ->
|
||||
stop_listener(GwName, Lis)
|
||||
end, Listeners).
|
||||
|
|
|
@ -48,7 +48,7 @@ unreg() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
on_gateway_load(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, Ctx) ->
|
||||
|
||||
%% Handler
|
||||
|
@ -58,13 +58,13 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
|||
emqx_lwm2m_coap_resource, undefined
|
||||
),
|
||||
%% Xml registry
|
||||
{ok, _} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, RawConf)),
|
||||
{ok, _} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)),
|
||||
|
||||
%% XXX: Self managed table?
|
||||
%% TODO: Improve it later
|
||||
{ok, _} = emqx_lwm2m_cm:start_link(),
|
||||
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
ListenerPids = lists:map(fun(Lis) ->
|
||||
start_listener(GwName, Ctx, Lis)
|
||||
end, Listeners),
|
||||
|
@ -86,7 +86,7 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
|
|||
end.
|
||||
|
||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, _GwState) ->
|
||||
%% XXX:
|
||||
lwm2m_coap_server_registry:remove_handler(
|
||||
|
@ -94,7 +94,7 @@ on_gateway_unload(_Gateway = #{ name := GwName,
|
|||
emqx_lwm2m_coap_resource, undefined
|
||||
),
|
||||
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
lists:foreach(fun(Lis) ->
|
||||
stop_listener(GwName, Lis)
|
||||
end, Listeners).
|
||||
|
|
|
@ -48,29 +48,29 @@ unreg() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
on_gateway_load(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, Ctx) ->
|
||||
|
||||
%% We Also need to start `emqx_sn_broadcast` &
|
||||
%% `emqx_sn_registry` process
|
||||
case maps:get(broadcast, RawConf, false) of
|
||||
case maps:get(broadcast, Config, false) of
|
||||
false ->
|
||||
ok;
|
||||
true ->
|
||||
%% FIXME:
|
||||
Port = 1884,
|
||||
SnGwId = maps:get(gateway_id, RawConf, undefined),
|
||||
SnGwId = maps:get(gateway_id, Config, undefined),
|
||||
_ = emqx_sn_broadcast:start_link(SnGwId, Port), ok
|
||||
end,
|
||||
|
||||
PredefTopics = maps:get(predefined, RawConf, []),
|
||||
PredefTopics = maps:get(predefined, Config, []),
|
||||
{ok, RegistrySvr} = emqx_sn_registry:start_link(GwName, PredefTopics),
|
||||
|
||||
NRawConf = maps:without(
|
||||
NConfig = maps:without(
|
||||
[broadcast, predefined],
|
||||
RawConf#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
|
||||
Config#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
|
||||
),
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(NConfig),
|
||||
|
||||
ListenerPids = lists:map(fun(Lis) ->
|
||||
start_listener(GwName, Ctx, Lis)
|
||||
|
@ -93,9 +93,9 @@ on_gateway_update(NewGateway = #{name := GwName}, OldGateway,
|
|||
end.
|
||||
|
||||
on_gateway_unload(_Insta = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, _GwState) ->
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
lists:foreach(fun(Lis) ->
|
||||
stop_listener(GwName, Lis)
|
||||
end, Listeners).
|
||||
|
|
|
@ -50,10 +50,10 @@ unreg() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
on_gateway_load(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, Ctx) ->
|
||||
%% Step1. Fold the rawconfs to listeners
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
%% Step1. Fold the config to listeners
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
%% Step2. Start listeners or escokd:specs
|
||||
ListenerPids = lists:map(fun(Lis) ->
|
||||
start_listener(GwName, Ctx, Lis)
|
||||
|
@ -78,9 +78,9 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
|
|||
end.
|
||||
|
||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||
rawconf := RawConf
|
||||
config := Config
|
||||
}, _GwState) ->
|
||||
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
|
||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||
lists:foreach(fun(Lis) ->
|
||||
stop_listener(GwName, Lis)
|
||||
end, Listeners).
|
||||
|
|
Loading…
Reference in New Issue