feat(lwm2m): support subscribe/unsubscribe operations
This commit is contained in:
parent
bfbf377a45
commit
561c95c31a
|
@ -321,7 +321,7 @@ default_udp_options() ->
|
||||||
[binary].
|
[binary].
|
||||||
|
|
||||||
default_subopts() ->
|
default_subopts() ->
|
||||||
#{rh => 0, %% Retain Handling
|
#{rh => 1, %% Retain Handling
|
||||||
rap => 0, %% Retain as Publish
|
rap => 0, %% Retain as Publish
|
||||||
nl => 0, %% No Local
|
nl => 0, %% No Local
|
||||||
qos => 0, %% QoS
|
qos => 0, %% QoS
|
||||||
|
|
|
@ -144,11 +144,6 @@ init(ConnInfoT = #{peername := {PeerHost, _},
|
||||||
, with_context = with_context(Ctx, ClientInfo)
|
, with_context = with_context(Ctx, ClientInfo)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
with_context(Ctx, ClientInfo) ->
|
|
||||||
fun(Type, Topic) ->
|
|
||||||
with_context(Type, Topic, Ctx, ClientInfo)
|
|
||||||
end.
|
|
||||||
|
|
||||||
lookup_cmd(Channel, Path, Action) ->
|
lookup_cmd(Channel, Path, Action) ->
|
||||||
gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}).
|
gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}).
|
||||||
|
|
||||||
|
@ -196,7 +191,8 @@ handle_timeout(_, _, Channel) ->
|
||||||
%% Handle call
|
%% Handle call
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) ->
|
handle_call({lookup_cmd, Path, Type}, _From,
|
||||||
|
Channel = #channel{session = Session}) ->
|
||||||
Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session),
|
Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session),
|
||||||
{reply, {ok, Result}, Channel};
|
{reply, {ok, Result}, Channel};
|
||||||
|
|
||||||
|
@ -204,14 +200,46 @@ handle_call({send_cmd, Cmd}, _From, Channel) ->
|
||||||
{ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel),
|
{ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel),
|
||||||
{reply, ok, Outs, Channel2};
|
{reply, ok, Outs, Channel2};
|
||||||
|
|
||||||
handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) ->
|
handle_call({subscribe, Topic, SubOpts}, _From,
|
||||||
{reply, {error, noimpl}, Channel};
|
Channel = #channel{
|
||||||
|
ctx = Ctx,
|
||||||
|
clientinfo = ClientInfo
|
||||||
|
= #{clientid := ClientId,
|
||||||
|
mountpoint := Mountpoint},
|
||||||
|
session = Session}) ->
|
||||||
|
NSubOpts = maps:merge(
|
||||||
|
emqx_gateway_utils:default_subopts(),
|
||||||
|
SubOpts),
|
||||||
|
MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
|
||||||
|
_ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts),
|
||||||
|
|
||||||
handle_call({unsubscribe, _Topic}, _From, Channel) ->
|
_ = run_hooks(Ctx, 'session.subscribed',
|
||||||
{reply, {error, noimpl}, Channel};
|
[ClientInfo, MountedTopic, NSubOpts]),
|
||||||
|
%% modifty session state
|
||||||
|
Subs = emqx_lwm2m_session:info(subscriptions, Session),
|
||||||
|
NSubs = maps:put(MountedTopic, NSubOpts, Subs),
|
||||||
|
NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session),
|
||||||
|
{reply, ok, Channel#channel{session = NSession}};
|
||||||
|
|
||||||
handle_call(subscriptions, _From, Channel) ->
|
handle_call({unsubscribe, Topic}, _From,
|
||||||
{reply, {error, noimpl}, Channel};
|
Channel = #channel{
|
||||||
|
ctx = Ctx,
|
||||||
|
clientinfo = ClientInfo
|
||||||
|
= #{mountpoint := Mountpoint},
|
||||||
|
session = Session}) ->
|
||||||
|
MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
|
||||||
|
ok = emqx_broker:unsubscribe(MountedTopic),
|
||||||
|
_ = run_hooks(Ctx, 'session.unsubscribe',
|
||||||
|
[ClientInfo, MountedTopic, #{}]),
|
||||||
|
%% modifty session state
|
||||||
|
Subs = emqx_lwm2m_session:info(subscriptions, Session),
|
||||||
|
NSubs = maps:remove(MountedTopic, Subs),
|
||||||
|
NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session),
|
||||||
|
{reply, ok, Channel#channel{session = NSession}};
|
||||||
|
|
||||||
|
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
|
||||||
|
Subs = maps:to_list(emqx_lwm2m_session:info(subscriptions, Session)),
|
||||||
|
{reply, {ok, Subs}, Channel};
|
||||||
|
|
||||||
handle_call(kick, _From, Channel) ->
|
handle_call(kick, _From, Channel) ->
|
||||||
NChannel = ensure_disconnected(kicked, Channel),
|
NChannel = ensure_disconnected(kicked, Channel),
|
||||||
|
@ -497,29 +525,44 @@ gets([H | T], Map) ->
|
||||||
gets([], Val) ->
|
gets([], Val) ->
|
||||||
Val.
|
Val.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% With Context
|
||||||
|
|
||||||
|
with_context(Ctx, ClientInfo) ->
|
||||||
|
fun(Type, Topic) ->
|
||||||
|
with_context(Type, Topic, Ctx, ClientInfo)
|
||||||
|
end.
|
||||||
|
|
||||||
with_context(publish, [Topic, Msg], Ctx, ClientInfo) ->
|
with_context(publish, [Topic, Msg], Ctx, ClientInfo) ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
||||||
allow ->
|
allow ->
|
||||||
emqx:publish(Msg);
|
_ = emqx_broker:publish(Msg),
|
||||||
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
?SLOG(error, #{ msg => "publish_denied"
|
?SLOG(error, #{ msg => "publish_denied"
|
||||||
, topic => Topic
|
, topic => Topic
|
||||||
})
|
}),
|
||||||
|
{error, deny}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) ->
|
with_context(subscribe, [Topic, Opts], Ctx, ClientInfo) ->
|
||||||
|
#{clientid := ClientId,
|
||||||
|
endpoint_name := EndpointName} = ClientInfo,
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
|
||||||
allow ->
|
allow ->
|
||||||
run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]),
|
run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]),
|
||||||
?SLOG(debug, #{ msg => "subscribe_topic_succeed"
|
?SLOG(debug, #{ msg => "subscribe_topic_succeed"
|
||||||
, topic => Topic
|
, topic => Topic
|
||||||
, endpoint_name => Username
|
, clientid => ClientId
|
||||||
|
, endpoint_name => EndpointName
|
||||||
}),
|
}),
|
||||||
emqx:subscribe(Topic, Username, Opts);
|
emqx_broker:subscribe(Topic, ClientId, Opts),
|
||||||
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
?SLOG(error, #{ msg => "subscribe_denied"
|
?SLOG(error, #{ msg => "subscribe_denied"
|
||||||
, topic => Topic
|
, topic => Topic
|
||||||
})
|
}),
|
||||||
|
{error, deny}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
with_context(metrics, Name, Ctx, _ClientInfo) ->
|
with_context(metrics, Name, Ctx, _ClientInfo) ->
|
||||||
|
|
|
@ -25,9 +25,11 @@
|
||||||
-export([ new/0, init/4, update/3, parse_object_list/1
|
-export([ new/0, init/4, update/3, parse_object_list/1
|
||||||
, reregister/3, on_close/1, find_cmd_record/3]).
|
, reregister/3, on_close/1, find_cmd_record/3]).
|
||||||
|
|
||||||
|
%% Info & Stats
|
||||||
-export([ info/1
|
-export([ info/1
|
||||||
, info/2
|
, info/2
|
||||||
, stats/1
|
, stats/1
|
||||||
|
, stats/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ handle_coap_in/3
|
-export([ handle_coap_in/3
|
||||||
|
@ -37,6 +39,10 @@
|
||||||
, send_cmd/3
|
, send_cmd/3
|
||||||
, set_reply/2]).
|
, set_reply/2]).
|
||||||
|
|
||||||
|
%% froce update subscriptions
|
||||||
|
-export([ set_subscriptions/2
|
||||||
|
]).
|
||||||
|
|
||||||
-export_type([session/0]).
|
-export_type([session/0]).
|
||||||
|
|
||||||
-type request_context() :: map().
|
-type request_context() :: map().
|
||||||
|
@ -66,6 +72,7 @@
|
||||||
, last_active_at :: non_neg_integer()
|
, last_active_at :: non_neg_integer()
|
||||||
, created_at :: non_neg_integer()
|
, created_at :: non_neg_integer()
|
||||||
, cmd_record :: cmd_record()
|
, cmd_record :: cmd_record()
|
||||||
|
, subscriptions :: map()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type session() :: #session{}.
|
-type session() :: #session{}.
|
||||||
|
@ -83,7 +90,9 @@
|
||||||
-define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}).
|
-define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}).
|
||||||
|
|
||||||
%% steal from emqx_session
|
%% steal from emqx_session
|
||||||
-define(INFO_KEYS, [subscriptions,
|
-define(INFO_KEYS, [id,
|
||||||
|
is_persistent,
|
||||||
|
subscriptions,
|
||||||
upgrade_qos,
|
upgrade_qos,
|
||||||
retry_interval,
|
retry_interval,
|
||||||
await_rel_timeout,
|
await_rel_timeout,
|
||||||
|
@ -99,7 +108,8 @@
|
||||||
mqueue_dropped,
|
mqueue_dropped,
|
||||||
next_pkt_id,
|
next_pkt_id,
|
||||||
awaiting_rel_cnt,
|
awaiting_rel_cnt,
|
||||||
awaiting_rel_max
|
awaiting_rel_max,
|
||||||
|
latency_stats
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(OUT_LIST_KEY, out_list).
|
-define(OUT_LIST_KEY, out_list).
|
||||||
|
@ -118,7 +128,9 @@ new() ->
|
||||||
, is_cache_mode = false
|
, is_cache_mode = false
|
||||||
, mountpoint = <<>>
|
, mountpoint = <<>>
|
||||||
, cmd_record = #{queue => queue:new()}
|
, cmd_record = #{queue => queue:new()}
|
||||||
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
|
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])
|
||||||
|
, subscriptions = #{}
|
||||||
|
}.
|
||||||
|
|
||||||
-spec init(coap_message(), binary(), function(), session()) -> map().
|
-spec init(coap_message(), binary(), function(), session()) -> map().
|
||||||
init(#coap_message{options = Opts,
|
init(#coap_message{options = Opts,
|
||||||
|
@ -152,7 +164,7 @@ update(Msg, WithContext, Session) ->
|
||||||
on_close(Session) ->
|
on_close(Session) ->
|
||||||
#{topic := Topic} = downlink_topic(),
|
#{topic := Topic} = downlink_topic(),
|
||||||
MountedTopic = mount(Topic, Session),
|
MountedTopic = mount(Topic, Session),
|
||||||
emqx:unsubscribe(MountedTopic),
|
emqx_broker:unsubscribe(MountedTopic),
|
||||||
MountedTopic.
|
MountedTopic.
|
||||||
|
|
||||||
-spec find_cmd_record(cmd_path(), cmd_type(), session()) -> cmd_result().
|
-spec find_cmd_record(cmd_path(), cmd_type(), session()) -> cmd_result().
|
||||||
|
@ -169,55 +181,56 @@ info(Session) ->
|
||||||
info(Keys, Session) when is_list(Keys) ->
|
info(Keys, Session) when is_list(Keys) ->
|
||||||
[{Key, info(Key, Session)} || Key <- Keys];
|
[{Key, info(Key, Session)} || Key <- Keys];
|
||||||
|
|
||||||
info(location_path, #session{location_path = Path}) ->
|
info(id, _) ->
|
||||||
Path;
|
undefined;
|
||||||
|
info(is_persistent, _) ->
|
||||||
info(lifetime, #session{lifetime = LT}) ->
|
false;
|
||||||
LT;
|
info(subscriptions, #session{subscriptions = Subs}) ->
|
||||||
|
Subs;
|
||||||
info(reg_info, #session{reg_info = RI}) ->
|
|
||||||
RI;
|
|
||||||
|
|
||||||
info(subscriptions, _) ->
|
|
||||||
[];
|
|
||||||
info(subscriptions_cnt, _) ->
|
|
||||||
0;
|
|
||||||
info(subscriptions_max, _) ->
|
|
||||||
infinity;
|
|
||||||
info(upgrade_qos, _) ->
|
info(upgrade_qos, _) ->
|
||||||
?QOS_0;
|
false;
|
||||||
info(inflight, _) ->
|
|
||||||
emqx_inflight:new();
|
|
||||||
info(inflight_cnt, _) ->
|
|
||||||
0;
|
|
||||||
info(inflight_max, _) ->
|
|
||||||
0;
|
|
||||||
info(retry_interval, _) ->
|
info(retry_interval, _) ->
|
||||||
infinity;
|
|
||||||
info(mqueue, _) ->
|
|
||||||
emqx_mqueue:init(#{max_len => 0, store_qos0 => false});
|
|
||||||
info(mqueue_len, #session{queue = Queue}) ->
|
|
||||||
queue:len(Queue);
|
|
||||||
info(mqueue_max, _) ->
|
|
||||||
0;
|
0;
|
||||||
info(mqueue_dropped, _) ->
|
|
||||||
0;
|
|
||||||
info(next_pkt_id, _) ->
|
|
||||||
0;
|
|
||||||
info(awaiting_rel, _) ->
|
|
||||||
#{};
|
|
||||||
info(awaiting_rel_cnt, _) ->
|
|
||||||
0;
|
|
||||||
info(awaiting_rel_max, _) ->
|
|
||||||
infinity;
|
|
||||||
info(await_rel_timeout, _) ->
|
info(await_rel_timeout, _) ->
|
||||||
infinity;
|
infinity;
|
||||||
info(created_at, #session{created_at = CreatedAt}) ->
|
info(created_at, #session{created_at = CreatedAt}) ->
|
||||||
CreatedAt.
|
CreatedAt;
|
||||||
|
%% used for channel
|
||||||
|
info(location_path, #session{location_path = Path}) ->
|
||||||
|
Path;
|
||||||
|
info(lifetime, #session{lifetime = LT}) ->
|
||||||
|
LT;
|
||||||
|
info(reg_info, #session{reg_info = RI}) ->
|
||||||
|
RI.
|
||||||
|
|
||||||
%% @doc Get stats of the session.
|
|
||||||
-spec(stats(session()) -> emqx_types:stats()).
|
-spec(stats(session()) -> emqx_types:stats()).
|
||||||
stats(Session) -> info(?STATS_KEYS, Session).
|
stats(Session) -> stats(?STATS_KEYS, Session).
|
||||||
|
|
||||||
|
stats(Keys, Session) when is_list(Keys) ->
|
||||||
|
[{Key, stats(Key, Session)} || Key <- Keys];
|
||||||
|
|
||||||
|
stats(subscriptions_cnt, #session{subscriptions = Subs}) ->
|
||||||
|
maps:size(Subs);
|
||||||
|
stats(subscriptions_max, _) ->
|
||||||
|
infinity;
|
||||||
|
stats(inflight_cnt, _) ->
|
||||||
|
0;
|
||||||
|
stats(inflight_max, _) ->
|
||||||
|
0;
|
||||||
|
stats(mqueue_len, _) ->
|
||||||
|
0;
|
||||||
|
stats(mqueue_max, _) ->
|
||||||
|
0;
|
||||||
|
stats(mqueue_dropped, _) ->
|
||||||
|
0;
|
||||||
|
stats(next_pkt_id, _) ->
|
||||||
|
0;
|
||||||
|
stats(awaiting_rel_cnt, _) ->
|
||||||
|
0;
|
||||||
|
stats(awaiting_rel_max, _) ->
|
||||||
|
infinity;
|
||||||
|
stats(latency_stats, _) ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -242,6 +255,9 @@ set_reply(Msg, #session{coap = Coap} = Session) ->
|
||||||
send_cmd(Cmd, _, Session) ->
|
send_cmd(Cmd, _, Session) ->
|
||||||
return(send_cmd_impl(Cmd, Session)).
|
return(send_cmd_impl(Cmd, Session)).
|
||||||
|
|
||||||
|
set_subscriptions(Subs, Session) ->
|
||||||
|
Session#session{subscriptions = Subs}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Protocol Stack
|
%% Protocol Stack
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -377,7 +393,10 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) ->
|
||||||
%% - subscribe to the downlink_topic and wait for commands
|
%% - subscribe to the downlink_topic and wait for commands
|
||||||
#{topic := Topic, qos := Qos} = downlink_topic(),
|
#{topic := Topic, qos := Qos} = downlink_topic(),
|
||||||
MountedTopic = mount(Topic, Session),
|
MountedTopic = mount(Topic, Session),
|
||||||
Session3 = subscribe(MountedTopic, Qos, WithContext, Session2),
|
SubOpts = maps:merge(
|
||||||
|
emqx_gateway_utils:default_subopts(),
|
||||||
|
#{qos => Qos}),
|
||||||
|
Session3 = do_subscribe(MountedTopic, SubOpts, WithContext, Session2),
|
||||||
Session4 = send_dl_msg(Session3),
|
Session4 = send_dl_msg(Session3),
|
||||||
|
|
||||||
%% - report the registration info
|
%% - report the registration info
|
||||||
|
@ -387,22 +406,33 @@ register_init(WithContext, #session{reg_info = RegInfo} = Session) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Subscribe
|
%% Subscribe
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) ->
|
proto_subscribe(WithContext, #session{wait_ack = WaitAck} = Session) ->
|
||||||
#{topic := Topic, qos := Qos} = downlink_topic(),
|
#{topic := Topic, qos := Qos} = downlink_topic(),
|
||||||
MountedTopic = mount(Topic, Session),
|
MountedTopic = mount(Topic, Session),
|
||||||
Session2 = case WaitAck of
|
SubOpts = maps:merge(
|
||||||
|
emqx_gateway_utils:default_subopts(),
|
||||||
|
#{qos => Qos}),
|
||||||
|
NSession = case WaitAck of
|
||||||
undefined ->
|
undefined ->
|
||||||
Session;
|
Session;
|
||||||
Ctx ->
|
Ctx ->
|
||||||
MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(Ctx, <<"coap_timeout">>),
|
MqttPayload = emqx_lwm2m_cmd:coap_failure_to_mqtt(
|
||||||
send_to_mqtt(Ctx, <<"coap_timeout">>, MqttPayload, WithContext, Session)
|
Ctx, <<"coap_timeout">>),
|
||||||
|
send_to_mqtt(Ctx, <<"coap_timeout">>,
|
||||||
|
MqttPayload, WithContext, Session)
|
||||||
end,
|
end,
|
||||||
subscribe(MountedTopic, Qos, WithContext, Session2).
|
do_subscribe(MountedTopic, SubOpts, WithContext, NSession).
|
||||||
|
|
||||||
subscribe(Topic, Qos, WithContext, Session) ->
|
do_subscribe(Topic, SubOpts, WithContext,
|
||||||
Opts = get_sub_opts(Qos),
|
Session = #session{subscriptions = Subs}) ->
|
||||||
WithContext(subscribe, [Topic, Opts]),
|
case WithContext(subscribe, [Topic, SubOpts]) of
|
||||||
Session.
|
{error, _} ->
|
||||||
|
Session;
|
||||||
|
ok ->
|
||||||
|
NSubs = maps:put(Topic, SubOpts, Subs),
|
||||||
|
Session#session{subscriptions = NSubs}
|
||||||
|
end.
|
||||||
|
|
||||||
send_auto_observe(RegInfo, Session) ->
|
send_auto_observe(RegInfo, Session) ->
|
||||||
%% - auto observe the objects
|
%% - auto observe the objects
|
||||||
|
@ -449,15 +479,6 @@ deliver_auto_observe_to_coap(AlternatePath, TermData, Session) ->
|
||||||
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
|
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
|
||||||
maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
|
maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
|
||||||
|
|
||||||
get_sub_opts(Qos) ->
|
|
||||||
#{
|
|
||||||
qos => Qos,
|
|
||||||
rap => 0,
|
|
||||||
nl => 0,
|
|
||||||
rh => 0,
|
|
||||||
is_new => false
|
|
||||||
}.
|
|
||||||
|
|
||||||
is_auto_observe() ->
|
is_auto_observe() ->
|
||||||
emqx:get_config([gateway, lwm2m, auto_observe]).
|
emqx:get_config([gateway, lwm2m, auto_observe]).
|
||||||
|
|
||||||
|
@ -609,7 +630,7 @@ proto_publish(Topic, Payload, Qos, Headers, WithContext,
|
||||||
%% TODO: Append message metadata into headers
|
%% TODO: Append message metadata into headers
|
||||||
Msg = emqx_message:make(Epn, Qos, MountedTopic,
|
Msg = emqx_message:make(Epn, Qos, MountedTopic,
|
||||||
emqx_json:encode(Payload), #{}, Headers),
|
emqx_json:encode(Payload), #{}, Headers),
|
||||||
WithContext(publish, [MountedTopic, Msg]),
|
_ = WithContext(publish, [MountedTopic, Msg]),
|
||||||
Session.
|
Session.
|
||||||
|
|
||||||
mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->
|
mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->
|
||||||
|
|
Loading…
Reference in New Issue