fix(gw): insert channel info into ets table

This commit is contained in:
JianBo He 2021-10-09 11:13:58 +08:00
parent 9c414096c7
commit cfc905aa1a
4 changed files with 51 additions and 31 deletions

View File

@ -71,6 +71,8 @@ apis() ->
, {<<"lte_created_at">>, timestamp} , {<<"lte_created_at">>, timestamp}
, {<<"gte_connected_at">>, timestamp} , {<<"gte_connected_at">>, timestamp}
, {<<"lte_connected_at">>, timestamp} , {<<"lte_connected_at">>, timestamp}
%% special keys for lwm2m protocol
, {<<"like_endpoint_name">>, binary}
]). ]).
-define(query_fun, {?MODULE, query}). -define(query_fun, {?MODULE, query}).

View File

@ -91,13 +91,13 @@ fields(coap) ->
] ++ gateway_common_options(); ] ++ gateway_common_options();
fields(lwm2m) -> fields(lwm2m) ->
[ {xml_dir, sc(binary())} [ {xml_dir, sc(binary(), "etc/lwm2m_xml")}
, {lifetime_min, sc(duration(), "1s")} , {lifetime_min, sc(duration(), "1s")}
, {lifetime_max, sc(duration(), "86400s")} , {lifetime_max, sc(duration(), "86400s")}
, {qmode_time_window, sc(integer(), 22)} , {qmode_time_window, sc(integer(), 22)}
, {auto_observe, sc(boolean(), false)} , {auto_observe, sc(boolean(), false)}
, {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))} , {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))}
, {translators, sc(ref(translators))} , {translators, sc_meta(ref(translators), #{nullable => false})}
, {listeners, sc(ref(udp_listeners))} , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options(); ] ++ gateway_common_options();
@ -133,7 +133,7 @@ fields(translators) ->
fields(translator) -> fields(translator) ->
[ {topic, sc(binary())} [ {topic, sc(binary())}
, {qos, sc(range(0, 2))} , {qos, sc(range(0, 2), 0)}
]; ];
fields(udp_listeners) -> fields(udp_listeners) ->

View File

@ -41,27 +41,34 @@
]). ]).
-record(channel, { -record(channel, {
%% Context %% Context
ctx :: emqx_gateway_ctx:context(), ctx :: emqx_gateway_ctx:context(),
%% Connection Info %% Connection Info
conninfo :: emqx_types:conninfo(), conninfo :: emqx_types:conninfo(),
%% Client Info %% Client Info
clientinfo :: emqx_types:clientinfo(), clientinfo :: emqx_types:clientinfo(),
%% Session %% Session
session :: emqx_lwm2m_session:session() | undefined, session :: emqx_lwm2m_session:session() | undefined,
%% Timer
timers :: #{atom() => disable | undefined | reference()},
with_context :: function()
}).
%% Timer %% TODO:
timers :: #{atom() => disable | undefined | reference()}, -define(DEFAULT_OVERRIDE,
#{ clientid => <<"">> %% Generate clientid by default
with_context :: function() , username => <<"${Packet.querystring.epn}">>
}). , password => <<"">>
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
info(Channel) -> info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)). maps:from_list(info(?INFO_KEYS, Channel)).
@ -75,7 +82,7 @@ info(conn_state, _) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo; ClientInfo;
info(session, #channel{session = Session}) -> info(session, #channel{session = Session}) ->
emqx_misc:maybe_apply(fun emqx_session:info/1, Session); emqx_misc:maybe_apply(fun emqx_lwm2m_session:info/1, Session);
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId; ClientId;
info(ctx, #channel{ctx = Ctx}) -> info(ctx, #channel{ctx = Ctx}) ->
@ -114,13 +121,13 @@ init(ConnInfo = #{peername := {PeerHost, _},
, clientinfo = ClientInfo , clientinfo = ClientInfo
, timers = #{} , timers = #{}
, session = emqx_lwm2m_session:new() , session = emqx_lwm2m_session:new()
%% FIXME: don't store anonymouse func
, with_context = with_context(Ctx, ClientInfo) , with_context = with_context(Ctx, ClientInfo)
}. }.
with_context(Ctx, ClientInfo) -> with_context(Ctx, ClientInfo) ->
fun(Type, Topic) -> fun(Type, Topic) ->
with_context(Type, Topic, Ctx, ClientInfo) with_context(Type, Topic, Ctx, ClientInfo)
end. end.
lookup_cmd(Channel, Path, Action) -> lookup_cmd(Channel, Path, Action) ->
@ -294,11 +301,12 @@ enrich_clientinfo(#coap_message{options = Options} = Msg,
Query = maps:get(uri_query, Options, #{}), Query = maps:get(uri_query, Options, #{}),
case Query of case Query of
#{<<"ep">> := Epn} -> #{<<"ep">> := Epn} ->
UserName = maps:get(<<"imei">>, Query, Epn), %% TODO: put endpoint-name, lifetime into clientinfo ???
Username = maps:get(<<"imei">>, Query, Epn),
Password = maps:get(<<"password">>, Query, undefined), Password = maps:get(<<"password">>, Query, undefined),
ClientId = maps:get(<<"device_id">>, Query, Epn), ClientId = maps:get(<<"device_id">>, Query, Epn),
ClientInfo = ClientInfo =
ClientInfo0#{username => UserName, ClientInfo0#{username => Username,
password => Password, password => Password,
clientid => ClientId}, clientid => ClientId},
{ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo), {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
@ -356,8 +364,14 @@ process_connect(Channel = #channel{ctx = Ctx,
) of ) of
{ok, _} -> {ok, _} ->
Mountpoint = maps:get(mountpoint, ClientInfo, <<>>), Mountpoint = maps:get(mountpoint, ClientInfo, <<>>),
NewResult = emqx_lwm2m_session:init(Msg, Mountpoint, WithContext, Session), NewResult0 = emqx_lwm2m_session:init(
iter(Iter, maps:merge(Result, NewResult), Channel); Msg,
Mountpoint,
WithContext,
Session
),
NewResult1 = NewResult0#{events => [{event, connected}]},
iter(Iter, maps:merge(Result, NewResult1), Channel);
{error, Reason} -> {error, Reason} ->
?LOG(error, "Failed to open session du to ~p", [Reason]), ?LOG(error, "Failed to open session du to ~p", [Reason]),
iter(Iter, reply({error, bad_request}, Msg, Result), Channel) iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
@ -386,12 +400,12 @@ with_context(publish, [Topic, Msg], Ctx, ClientInfo) ->
?LOG(error, "topic:~p not allow to publish ", [Topic]) ?LOG(error, "topic:~p not allow to publish ", [Topic])
end; end;
with_context(subscribe, [Topic, Opts], Ctx, #{username := UserName} = ClientInfo) -> with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) ->
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
allow -> allow ->
run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, UserName]), run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]),
?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, UserName]), ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, Username]),
emqx:subscribe(Topic, UserName, Opts); emqx:subscribe(Topic, Username, Opts);
_ -> _ ->
?LOG(error, "Topic: ~0p not allow to subscribe", [Topic]) ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic])
end; end;
@ -479,14 +493,15 @@ process_out(Outs, Result, Channel, _) ->
Reply -> Reply ->
[Reply | Outs2] [Reply | Outs2]
end, end,
Events = maps:get(events, Result, []),
{ok, {outgoing, Outs3}, Channel}. {ok, [{outgoing, Outs3}] ++ Events, Channel}.
process_reply(Reply, Result, #channel{session = Session} = Channel, _) -> process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
Session2 = emqx_lwm2m_session:set_reply(Reply, Session), Session2 = emqx_lwm2m_session:set_reply(Reply, Session),
Outs = maps:get(out, Result, []), Outs = maps:get(out, Result, []),
Outs2 = lists:reverse(Outs), Outs2 = lists:reverse(Outs),
{ok, {outgoing, [Reply | Outs2]}, Channel#channel{session = Session2}}. Events = maps:get(events, Result, []),
{ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}.
process_lifetime(_, Result, Channel, Iter) -> process_lifetime(_, Result, Channel, Iter) ->
iter(Iter, Result, update_life_timer(Channel)). iter(Iter, Result, update_life_timer(Channel)).

View File

@ -62,6 +62,7 @@
, is_cache_mode :: boolean() , is_cache_mode :: boolean()
, mountpoint :: binary() , mountpoint :: binary()
, last_active_at :: non_neg_integer() , last_active_at :: non_neg_integer()
, created_at :: non_neg_integer()
, cmd_record :: cmd_record() , cmd_record :: cmd_record()
}). }).
@ -109,6 +110,7 @@ new() ->
#session{ coap = emqx_coap_tm:new() #session{ coap = emqx_coap_tm:new()
, queue = queue:new() , queue = queue:new()
, last_active_at = ?NOW , last_active_at = ?NOW
, created_at = erlang:system_time(millisecond)
, is_cache_mode = false , is_cache_mode = false
, mountpoint = <<>> , mountpoint = <<>>
, cmd_record = #{} , cmd_record = #{}
@ -206,7 +208,7 @@ info(awaiting_rel_max, _) ->
infinity; infinity;
info(await_rel_timeout, _) -> info(await_rel_timeout, _) ->
infinity; infinity;
info(created_at, #session{last_active_at = CreatedAt}) -> info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt. CreatedAt.
%% @doc Get stats of the session. %% @doc Get stats of the session.
@ -598,6 +600,7 @@ proto_publish(Topic, Payload, Qos, Headers, WithContext,
mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) -> mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->
<<MountPoint/binary, Topic/binary>>. <<MountPoint/binary, Topic/binary>>.
%% XXX: get these confs from params instead of shared mem
downlink_topic() -> downlink_topic() ->
emqx:get_config([gateway, lwm2m, translators, command]). emqx:get_config([gateway, lwm2m, translators, command]).