From cfc905aa1a7f12eeb9b541e49a43a573acc9a5a3 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 9 Oct 2021 11:13:58 +0800 Subject: [PATCH] fix(gw): insert channel info into ets table --- .../src/emqx_gateway_api_clients.erl | 2 + apps/emqx_gateway/src/emqx_gateway_schema.erl | 6 +- .../src/lwm2m/emqx_lwm2m_channel.erl | 69 +++++++++++-------- .../src/lwm2m/emqx_lwm2m_session.erl | 5 +- 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index a436c6935..3df8e96c9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -71,6 +71,8 @@ apis() -> , {<<"lte_created_at">>, timestamp} , {<<"gte_connected_at">>, timestamp} , {<<"lte_connected_at">>, timestamp} + %% special keys for lwm2m protocol + , {<<"like_endpoint_name">>, binary} ]). -define(query_fun, {?MODULE, query}). diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 32f24b2dd..c35fc45de 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -91,13 +91,13 @@ fields(coap) -> ] ++ gateway_common_options(); fields(lwm2m) -> - [ {xml_dir, sc(binary())} + [ {xml_dir, sc(binary(), "etc/lwm2m_xml")} , {lifetime_min, sc(duration(), "1s")} , {lifetime_max, sc(duration(), "86400s")} , {qmode_time_window, sc(integer(), 22)} , {auto_observe, sc(boolean(), false)} , {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))} - , {translators, sc(ref(translators))} + , {translators, sc_meta(ref(translators), #{nullable => false})} , {listeners, sc(ref(udp_listeners))} ] ++ gateway_common_options(); @@ -133,7 +133,7 @@ fields(translators) -> fields(translator) -> [ {topic, sc(binary())} - , {qos, sc(range(0, 2))} + , {qos, sc(range(0, 2), 0)} ]; fields(udp_listeners) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index bfa898f0f..1dbf27474 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -41,27 +41,34 @@ ]). -record(channel, { - %% Context - ctx :: emqx_gateway_ctx:context(), - %% Connection Info - conninfo :: emqx_types:conninfo(), - %% Client Info - clientinfo :: emqx_types:clientinfo(), - %% Session - session :: emqx_lwm2m_session:session() | undefined, + %% Context + ctx :: emqx_gateway_ctx:context(), + %% Connection Info + conninfo :: emqx_types:conninfo(), + %% Client Info + clientinfo :: emqx_types:clientinfo(), + %% Session + session :: emqx_lwm2m_session:session() | undefined, + %% Timer + timers :: #{atom() => disable | undefined | reference()}, + with_context :: function() + }). - %% Timer - timers :: #{atom() => disable | undefined | reference()}, - - with_context :: function() - }). +%% TODO: +-define(DEFAULT_OVERRIDE, + #{ clientid => <<"">> %% Generate clientid by default + , username => <<"${Packet.querystring.epn}">> + , password => <<"">> + }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). + -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- + info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). @@ -75,7 +82,7 @@ info(conn_state, _) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_misc:maybe_apply(fun emqx_session:info/1, Session); + emqx_misc:maybe_apply(fun emqx_lwm2m_session:info/1, Session); info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; info(ctx, #channel{ctx = Ctx}) -> @@ -114,13 +121,13 @@ init(ConnInfo = #{peername := {PeerHost, _}, , clientinfo = ClientInfo , timers = #{} , session = emqx_lwm2m_session:new() + %% FIXME: don't store anonymouse func , with_context = with_context(Ctx, ClientInfo) }. - with_context(Ctx, ClientInfo) -> fun(Type, Topic) -> - with_context(Type, Topic, Ctx, ClientInfo) + with_context(Type, Topic, Ctx, ClientInfo) end. lookup_cmd(Channel, Path, Action) -> @@ -294,11 +301,12 @@ enrich_clientinfo(#coap_message{options = Options} = Msg, Query = maps:get(uri_query, Options, #{}), case Query of #{<<"ep">> := Epn} -> - UserName = maps:get(<<"imei">>, Query, Epn), + %% TODO: put endpoint-name, lifetime into clientinfo ??? + Username = maps:get(<<"imei">>, Query, Epn), Password = maps:get(<<"password">>, Query, undefined), ClientId = maps:get(<<"device_id">>, Query, Epn), ClientInfo = - ClientInfo0#{username => UserName, + ClientInfo0#{username => Username, password => Password, clientid => ClientId}, {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo), @@ -356,8 +364,14 @@ process_connect(Channel = #channel{ctx = Ctx, ) of {ok, _} -> Mountpoint = maps:get(mountpoint, ClientInfo, <<>>), - NewResult = emqx_lwm2m_session:init(Msg, Mountpoint, WithContext, Session), - iter(Iter, maps:merge(Result, NewResult), Channel); + NewResult0 = emqx_lwm2m_session:init( + Msg, + Mountpoint, + WithContext, + Session + ), + NewResult1 = NewResult0#{events => [{event, connected}]}, + iter(Iter, maps:merge(Result, NewResult1), Channel); {error, Reason} -> ?LOG(error, "Failed to open session du to ~p", [Reason]), iter(Iter, reply({error, bad_request}, Msg, Result), Channel) @@ -386,12 +400,12 @@ with_context(publish, [Topic, Msg], Ctx, ClientInfo) -> ?LOG(error, "topic:~p not allow to publish ", [Topic]) end; -with_context(subscribe, [Topic, Opts], Ctx, #{username := UserName} = ClientInfo) -> +with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) -> case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of allow -> - run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, UserName]), - ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, UserName]), - emqx:subscribe(Topic, UserName, Opts); + run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]), + ?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, Username]), + emqx:subscribe(Topic, Username, Opts); _ -> ?LOG(error, "Topic: ~0p not allow to subscribe", [Topic]) end; @@ -479,14 +493,15 @@ process_out(Outs, Result, Channel, _) -> Reply -> [Reply | Outs2] end, - - {ok, {outgoing, Outs3}, Channel}. + Events = maps:get(events, Result, []), + {ok, [{outgoing, Outs3}] ++ Events, Channel}. process_reply(Reply, Result, #channel{session = Session} = Channel, _) -> Session2 = emqx_lwm2m_session:set_reply(Reply, Session), Outs = maps:get(out, Result, []), Outs2 = lists:reverse(Outs), - {ok, {outgoing, [Reply | Outs2]}, Channel#channel{session = Session2}}. + Events = maps:get(events, Result, []), + {ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}. process_lifetime(_, Result, Channel, Iter) -> iter(Iter, Result, update_life_timer(Channel)). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index ab27dfbca..38f1b59e3 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -62,6 +62,7 @@ , is_cache_mode :: boolean() , mountpoint :: binary() , last_active_at :: non_neg_integer() + , created_at :: non_neg_integer() , cmd_record :: cmd_record() }). @@ -109,6 +110,7 @@ new() -> #session{ coap = emqx_coap_tm:new() , queue = queue:new() , last_active_at = ?NOW + , created_at = erlang:system_time(millisecond) , is_cache_mode = false , mountpoint = <<>> , cmd_record = #{} @@ -206,7 +208,7 @@ info(awaiting_rel_max, _) -> infinity; info(await_rel_timeout, _) -> infinity; -info(created_at, #session{last_active_at = CreatedAt}) -> +info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. %% @doc Get stats of the session. @@ -598,6 +600,7 @@ proto_publish(Topic, Payload, Qos, Headers, WithContext, mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) -> <>. +%% XXX: get these confs from params instead of shared mem downlink_topic() -> emqx:get_config([gateway, lwm2m, translators, command]).