From 561c95c31a4c99f73963ec03e7b568c3874f159c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 17 Dec 2021 14:10:45 +0800 Subject: [PATCH] feat(lwm2m): support subscribe/unsubscribe operations --- apps/emqx_gateway/src/emqx_gateway_utils.erl | 2 +- .../src/lwm2m/emqx_lwm2m_channel.erl | 79 +++++++--- .../src/lwm2m/emqx_lwm2m_session.erl | 149 ++++++++++-------- 3 files changed, 147 insertions(+), 83 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 5d17fe6ca..fa74f9437 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -321,7 +321,7 @@ default_udp_options() -> [binary]. default_subopts() -> - #{rh => 0, %% Retain Handling + #{rh => 1, %% Retain Handling rap => 0, %% Retain as Publish nl => 0, %% No Local qos => 0, %% QoS diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index c28a4724d..3edee0f47 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -144,11 +144,6 @@ init(ConnInfoT = #{peername := {PeerHost, _}, , with_context = with_context(Ctx, ClientInfo) }. -with_context(Ctx, ClientInfo) -> - fun(Type, Topic) -> - with_context(Type, Topic, Ctx, ClientInfo) - end. - lookup_cmd(Channel, Path, Action) -> gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}). @@ -196,7 +191,8 @@ handle_timeout(_, _, Channel) -> %% Handle call %%-------------------------------------------------------------------- -handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) -> +handle_call({lookup_cmd, Path, Type}, _From, + Channel = #channel{session = Session}) -> Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session), {reply, {ok, Result}, Channel}; @@ -204,14 +200,46 @@ handle_call({send_cmd, Cmd}, _From, Channel) -> {ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel), {reply, ok, Outs, Channel2}; -handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) -> - {reply, {error, noimpl}, Channel}; +handle_call({subscribe, Topic, SubOpts}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{clientid := ClientId, + mountpoint := Mountpoint}, + session = Session}) -> + NSubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + SubOpts), + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + _ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts), -handle_call({unsubscribe, _Topic}, _From, Channel) -> - {reply, {error, noimpl}, Channel}; + _ = run_hooks(Ctx, 'session.subscribed', + [ClientInfo, MountedTopic, NSubOpts]), + %% modifty session state + Subs = emqx_lwm2m_session:info(subscriptions, Session), + NSubs = maps:put(MountedTopic, NSubOpts, Subs), + NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session), + {reply, ok, Channel#channel{session = NSession}}; -handle_call(subscriptions, _From, Channel) -> - {reply, {error, noimpl}, Channel}; +handle_call({unsubscribe, Topic}, _From, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{mountpoint := Mountpoint}, + session = Session}) -> + MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic), + ok = emqx_broker:unsubscribe(MountedTopic), + _ = run_hooks(Ctx, 'session.unsubscribe', + [ClientInfo, MountedTopic, #{}]), + %% modifty session state + Subs = emqx_lwm2m_session:info(subscriptions, Session), + NSubs = maps:remove(MountedTopic, Subs), + NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session), + {reply, ok, Channel#channel{session = NSession}}; + +handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> + Subs = maps:to_list(emqx_lwm2m_session:info(subscriptions, Session)), + {reply, {ok, Subs}, Channel}; handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), @@ -497,29 +525,44 @@ gets([H | T], Map) -> gets([], Val) -> Val. +%%-------------------------------------------------------------------- +%% With Context + +with_context(Ctx, ClientInfo) -> + fun(Type, Topic) -> + with_context(Type, Topic, Ctx, ClientInfo) + end. + with_context(publish, [Topic, Msg], Ctx, ClientInfo) -> case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of allow -> - emqx:publish(Msg); + _ = emqx_broker:publish(Msg), + ok; _ -> ?SLOG(error, #{ msg => "publish_denied" , topic => Topic - }) + }), + {error, deny} end; -with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) -> +with_context(subscribe, [Topic, Opts], Ctx, ClientInfo) -> + #{clientid := ClientId, + endpoint_name := EndpointName} = ClientInfo, case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of allow -> run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]), ?SLOG(debug, #{ msg => "subscribe_topic_succeed" , topic => Topic - , endpoint_name => Username + , clientid => ClientId + , endpoint_name => EndpointName }), - emqx:subscribe(Topic, Username, Opts); + emqx_broker:subscribe(Topic, ClientId, Opts), + ok; _ -> ?SLOG(error, #{ msg => "subscribe_denied" , topic => Topic - }) + }), + {error, deny} end; with_context(metrics, Name, Ctx, _ClientInfo) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index 3cf8fde78..bee1bedcd 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -25,9 +25,11 @@ -export([ new/0, init/4, update/3, parse_object_list/1 , reregister/3, on_close/1, find_cmd_record/3]). +%% Info & Stats -export([ info/1 , info/2 , stats/1 + , stats/2 ]). -export([ handle_coap_in/3 @@ -37,6 +39,10 @@ , send_cmd/3 , set_reply/2]). +%% froce update subscriptions +-export([ set_subscriptions/2 + ]). + -export_type([session/0]). -type request_context() :: map(). @@ -66,6 +72,7 @@ , last_active_at :: non_neg_integer() , created_at :: non_neg_integer() , cmd_record :: cmd_record() + , subscriptions :: map() }). -type session() :: #session{}. @@ -83,7 +90,9 @@ -define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}). %% steal from emqx_session --define(INFO_KEYS, [subscriptions, +-define(INFO_KEYS, [id, + is_persistent, + subscriptions, upgrade_qos, retry_interval, await_rel_timeout, @@ -99,7 +108,8 @@ mqueue_dropped, next_pkt_id, awaiting_rel_cnt, - awaiting_rel_max + awaiting_rel_max, + latency_stats ]). -define(OUT_LIST_KEY, out_list). @@ -118,7 +128,9 @@ new() -> , is_cache_mode = false , mountpoint = <<>> , cmd_record = #{queue => queue:new()} - , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}. + , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max]) + , subscriptions = #{} + }. -spec init(coap_message(), binary(), function(), session()) -> map(). init(#coap_message{options = Opts, @@ -152,7 +164,7 @@ update(Msg, WithContext, Session) -> on_close(Session) -> #{topic := Topic} = downlink_topic(), MountedTopic = mount(Topic, Session), - emqx:unsubscribe(MountedTopic), + emqx_broker:unsubscribe(MountedTopic), MountedTopic. -spec find_cmd_record(cmd_path(), cmd_type(), session()) -> cmd_result(). @@ -169,55 +181,56 @@ info(Session) -> info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; -info(location_path, #session{location_path = Path}) -> - Path; - -info(lifetime, #session{lifetime = LT}) -> - LT; - -info(reg_info, #session{reg_info = RI}) -> - RI; - -info(subscriptions, _) -> - []; -info(subscriptions_cnt, _) -> - 0; -info(subscriptions_max, _) -> - infinity; +info(id, _) -> + undefined; +info(is_persistent, _) -> + false; +info(subscriptions, #session{subscriptions = Subs}) -> + Subs; info(upgrade_qos, _) -> - ?QOS_0; -info(inflight, _) -> - emqx_inflight:new(); -info(inflight_cnt, _) -> - 0; -info(inflight_max, _) -> - 0; + false; info(retry_interval, _) -> - infinity; -info(mqueue, _) -> - emqx_mqueue:init(#{max_len => 0, store_qos0 => false}); -info(mqueue_len, #session{queue = Queue}) -> - queue:len(Queue); -info(mqueue_max, _) -> 0; -info(mqueue_dropped, _) -> - 0; -info(next_pkt_id, _) -> - 0; -info(awaiting_rel, _) -> - #{}; -info(awaiting_rel_cnt, _) -> - 0; -info(awaiting_rel_max, _) -> - infinity; info(await_rel_timeout, _) -> infinity; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt. + CreatedAt; +%% used for channel +info(location_path, #session{location_path = Path}) -> + Path; +info(lifetime, #session{lifetime = LT}) -> + LT; +info(reg_info, #session{reg_info = RI}) -> + RI. -%% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). -stats(Session) -> info(?STATS_KEYS, Session). +stats(Session) -> stats(?STATS_KEYS, Session). + +stats(Keys, Session) when is_list(Keys) -> + [{Key, stats(Key, Session)} || Key <- Keys]; + +stats(subscriptions_cnt, #session{subscriptions = Subs}) -> + maps:size(Subs); +stats(subscriptions_max, _) -> + infinity; +stats(inflight_cnt, _) -> + 0; +stats(inflight_max, _) -> + 0; +stats(mqueue_len, _) -> + 0; +stats(mqueue_max, _) -> + 0; +stats(mqueue_dropped, _) -> + 0; +stats(next_pkt_id, _) -> + 0; +stats(awaiting_rel_cnt, _) -> + 0; +stats(awaiting_rel_max, _) -> + infinity; +stats(latency_stats, _) -> + #{}. %%-------------------------------------------------------------------- %% API @@ -242,6 +255,9 @@ set_reply(Msg, #session{coap = Coap} = Session) -> send_cmd(Cmd, _, Session) -> return(send_cmd_impl(Cmd, Session)). +set_subscriptions(Subs, Session) -> + Session#session{subscriptions = Subs}. + %%-------------------------------------------------------------------- %% Protocol Stack %%-------------------------------------------------------------------- @@ -377,7 +393,10 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) -> %% - subscribe to the downlink_topic and wait for commands #{topic := Topic, qos := Qos} = downlink_topic(), MountedTopic = mount(Topic, Session), - Session3 = subscribe(MountedTopic, Qos, WithContext, Session2), + SubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + #{qos => Qos}), + Session3 = do_subscribe(MountedTopic, SubOpts, WithContext, Session2), Session4 = send_dl_msg(Session3), %% - report the registration info @@ -387,22 +406,33 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) -> %%-------------------------------------------------------------------- %% Subscribe %%-------------------------------------------------------------------- + proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) -> #{topic := Topic, qos := Qos} = downlink_topic(), MountedTopic = mount(Topic, Session), - Session2 = case WaitAck of + SubOpts = maps:merge( + emqx_gateway_utils:default_subopts(), + #{qos => Qos}), + NSession = case WaitAck of undefined -> Session; Ctx -> - MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>), - send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, WithContext, Session) + MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt( + Ctx, <<"coap_timeout">>), + send_to_mqtt(Ctx, <<"coap_timeout">>, + MqttPayload, WithContext, Session) end, - subscribe(MountedTopic, Qos, WithContext, Session2). + do_subscribe(MountedTopic, SubOpts, WithContext, NSession). -subscribe(Topic, Qos, WithContext, Session) -> - Opts = get_sub_opts(Qos), - WithContext(subscribe, [Topic, Opts]), - Session. +do_subscribe(Topic, SubOpts, WithContext, + Session = #session{subscriptions = Subs}) -> + case WithContext(subscribe, [Topic, SubOpts]) of + {error, _} -> + Session; + ok -> + NSubs = maps:put(Topic, SubOpts, Subs), + Session#session{subscriptions = NSubs} + end. send_auto_observe(RegInfo, Session) -> %% - auto observe the objects @@ -449,15 +479,6 @@ deliver_auto_observe_to_coap(AlternatePath, TermData, Session) -> {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData), maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session). -get_sub_opts(Qos) -> - #{ - qos => Qos, - rap => 0, - nl => 0, - rh => 0, - is_new => false - }. - is_auto_observe() -> emqx:get_config([gateway, lwm2m, auto_observe]). @@ -609,7 +630,7 @@ proto_publish(Topic, Payload, Qos, Headers, WithContext, %% TODO: Append message metadata into headers Msg = emqx_message:make(Epn, Qos, MountedTopic, emqx_json:encode(Payload), #{}, Headers), - WithContext(publish, [MountedTopic, Msg]), + _ = WithContext(publish, [MountedTopic, Msg]), Session. mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->