Improve the session management

This commit is contained in:
Feng Lee 2018-04-04 15:28:01 +08:00
parent 6459481024
commit 39548cc399
12 changed files with 295 additions and 116 deletions

View File

@ -81,8 +81,8 @@
-type(zone() :: atom()). -type(zone() :: atom()).
-record(client, -record(client,
{ id :: client_id(), { client_id :: client_id(),
pid :: pid(), client_pid :: pid(),
zone :: zone(), zone :: zone(),
node :: node(), node :: node(),
username :: username(), username :: username(),

View File

@ -99,7 +99,7 @@ match_who(_Client, {user, all}) ->
true; true;
match_who(_Client, {client, all}) -> match_who(_Client, {client, all}) ->
true; true;
match_who(#client{id = ClientId}, {client, ClientId}) -> match_who(#client{client_id = ClientId}, {client, ClientId}) ->
true; true;
match_who(#client{username = Username}, {user, Username}) -> match_who(#client{username = Username}, {user, Username}) ->
true; true;
@ -137,9 +137,9 @@ feed_var(Client, Pattern) ->
feed_var(Client, Pattern, []). feed_var(Client, Pattern, []).
feed_var(_Client, [], Acc) -> feed_var(_Client, [], Acc) ->
lists:reverse(Acc); lists:reverse(Acc);
feed_var(Client = #client{id = undefined}, [<<"%c">>|Words], Acc) -> feed_var(Client = #client{client_id = undefined}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [<<"%c">>|Acc]); feed_var(Client, Words, [<<"%c">>|Acc]);
feed_var(Client = #client{id = ClientId}, [<<"%c">>|Words], Acc) -> feed_var(Client = #client{client_id = ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [ClientId |Acc]); feed_var(Client, Words, [ClientId |Acc]);
feed_var(Client = #client{username = undefined}, [<<"%u">>|Words], Acc) -> feed_var(Client = #client{username = undefined}, [<<"%u">>|Words], Acc) ->
feed_var(Client, Words, [<<"%u">>|Acc]); feed_var(Client, Words, [<<"%u">>|Acc]);

72
src/emqx_banned.erl Normal file
View File

@ -0,0 +1,72 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Banned an IP Address, ClientId?
-module(emqx_banned).
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Starts the server
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

74
src/emqx_flapping.erl Normal file
View File

@ -0,0 +1,74 @@
%%--------------------------------------------------------------------
%% Copyright (C) 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% 1. Flapping Detection
%% 2. Conflict Detection?
-module(emqx_flapping).
%% Use ets:update_counter???
-behaviour(gen_server).
-export([start_link/0]).
-export([is_banned/1, banned/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {}).
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
is_banned(ClientId) ->
ets:member(banned, ClientId).
banned(ClientId) ->
ets:insert(banned, {ClientId, os:timestamp()}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
_ = ets:new(banned, [public, ordered_set, named_table]),
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -28,9 +28,9 @@ load(Env) ->
emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
on_client_connected(ConnAck, Client = #client{id = ClientId, on_client_connected(ConnAck, Client = #client{client_id = ClientId,
username = Username, username = Username,
peername = {IpAddr, _} peername = {IpAddr, _}
%%clean_sess = CleanSess, %%clean_sess = CleanSess,
%%proto_ver = ProtoVer %%proto_ver = ProtoVer
}, Env) -> }, Env) ->
@ -49,7 +49,7 @@ on_client_connected(ConnAck, Client = #client{id = ClientId,
end, end,
{ok, Client}. {ok, Client}.
on_client_disconnected(Reason, #client{id = ClientId, on_client_disconnected(Reason, #client{client_id = ClientId,
username = Username}, Env) -> username = Username}, Env) ->
case catch emqx_json:encode([{clientid, ClientId}, case catch emqx_json:encode([{clientid, ClientId},
{username, Username}, {username, Username},

View File

@ -33,9 +33,9 @@
load(Topics) -> load(Topics) ->
emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]). emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]).
on_client_connected(?CONNACK_ACCEPT, Client = #client{id = ClientId, on_client_connected(?CONNACK_ACCEPT, Client = #client{client_id = ClientId,
pid = ClientPid, client_pid = ClientPid,
username = Username}, Topics) -> username = Username}, Topics) ->
Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end,
TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics], TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics],

View File

@ -122,8 +122,8 @@ client(#proto_state{client_id = ClientId,
WillMsg =:= undefined -> undefined; WillMsg =:= undefined -> undefined;
true -> WillMsg#message.topic true -> WillMsg#message.topic
end, end,
#client{id = ClientId, #client{client_id = ClientId,
pid = ClientPid, client_pid = ClientPid,
username = Username, username = Username,
peername = Peername, peername = Peername,
mountpoint = MountPoint}. mountpoint = MountPoint}.
@ -327,7 +327,7 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
mountpoint = MountPoint, mountpoint = MountPoint,
session = Session}) -> session = Session}) ->
Msg = emqx_packet:to_message(Packet), Msg = emqx_packet:to_message(Packet),
Msg1 = Msg#message{from = #client{id = ClientId, username = Username}}, Msg1 = Msg#message{from = #client{client_id = ClientId, username = Username}},
emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1)); emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1));
publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
@ -343,7 +343,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
session = Session}) -> session = Session}) ->
%% TODO: ... %% TODO: ...
Msg = emqx_packet:to_message(Packet), Msg = emqx_packet:to_message(Packet),
Msg1 = Msg#message{from = #client{id = ClientId, username = Username}}, Msg1 = Msg#message{from = #client{client_id = ClientId, username = Username}},
case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1)) of case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1)) of
ok -> ok ->
send(?PUBACK_PACKET(Type, PacketId), State); send(?PUBACK_PACKET(Type, PacketId), State);

View File

@ -153,9 +153,8 @@ cast(Router, Msg) ->
pick(Topic) -> pick(Topic) ->
gproc_pool:pick_worker(router, Topic). gproc_pool:pick_worker(router, Topic).
%%FIXME: OOM?
dump() -> dump() ->
[{route, [{To, Dest} || #route{topic = To, dest = Dest} <- ets:tab2list(route)]}]. ets:tab2list(route).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks

View File

@ -299,7 +299,7 @@ init(#{clean_start := CleanStart,
force_gc_count = ForceGcCount, force_gc_count = ForceGcCount,
ignore_loop_deliver = IgnoreLoopDeliver, ignore_loop_deliver = IgnoreLoopDeliver,
created_at = os:timestamp()}, created_at = os:timestamp()},
%%emqx_sm:register_session(ClientId, info(State)), emqx_sm:register_session(ClientId, self()),
emqx_hooks:run('session.created', [ClientId, Username]), emqx_hooks:run('session.created', [ClientId, Username]),
io:format("Session started: ~p~n", [self()]), io:format("Session started: ~p~n", [self()]),
{ok, emit_stats(State), hibernate}. {ok, emit_stats(State), hibernate}.

View File

@ -24,128 +24,134 @@
-export([open_session/1, lookup_session/1, close_session/1]). -export([open_session/1, lookup_session/1, close_session/1]).
-export([resume_session/1, discard_session/1]). -export([resume_session/1, discard_session/1]).
-export([register_session/1, unregister_session/1, unregister_session/2]). -export([register_session/1, register_session/2]).
-export([unregister_session/1, unregister_session/2]).
%% lock_session/1, create_session/1, unlock_session/1, %% Internal functions for rpc
-export([lookup/1, dispatch/3]).
-export([dispatch/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {stats_fun, stats_timer, monitors = #{}}). -record(state, {stats, pids = #{}}).
-spec(start_link(StatsFun :: fun()) -> {ok, pid()} | ignore | {error, term()}). -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}).
start_link(StatsFun) -> start_link(StatsFun) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
open_session(Session = #{client_id := ClientId, clean_start := true}) -> open_session(Attrs = #{clean_start := true,
with_lock(ClientId, client_id := ClientId, client_pid := ClientPid}) ->
fun() -> CleanStart = fun(_) ->
io:format("Nodelist: ~p~n", [ekka_membership:nodelist()]), discard_session(ClientId, ClientPid),
case rpc:multicall(ekka_membership:nodelist(), ?MODULE, discard_session, [ClientId]) of emqx_session_sup:start_session(Attrs)
{_Res, []} -> ok; end,
{_Res, BadNodes} -> emqx_log:error("[SM] Bad nodes found when lock a session: ~p", [BadNodes]) emqx_sm_locker:trans(ClientId, CleanStart);
open_session(Attrs = #{clean_start := false,
client_id := ClientId, client_pid := ClientPid}) ->
ResumeStart = fun(_) ->
case resume_session(ClientId, ClientPid) of
{ok, SessionPid} ->
{ok, SessionPid};
{error, not_found} ->
emqx_session_sup:start_session(Attrs);
{error, Reason} ->
{error, Reason}
end
end, end,
io:format("Begin to start session: ~p~n", [Session]), emqx_sm_locker:trans(ClientId, ResumeStart).
emqx_session_sup:start_session(Session)
end);
open_session(Session = #{client_id := ClientId, clean_start := false}) ->
with_lock(ClientId,
fun() ->
{ResL, _BadNodes} = rpc:multicall(ekka_membership:nodelist(), ?MODULE, lookup_session, [ClientId]),
case lists:flatten([Pid || Pid <- ResL, Pid =/= undefined]) of
[] ->
{ok, emqx_session_sup:start_session(Session)};
[SessPid|_] ->
case resume_session(SessPid) of
ok -> {ok, SessPid};
{error, Reason} ->
emqx_log:error("[SM] Failed to resume session: ~p, ~p", [Session, Reason]),
emqx_session_sup:start_session(Session)
end
end
end).
resume_session(SessPid) when node(SessPid) == node() ->
case is_process_alive(SessPid) of
true ->
emqx_session:resume(SessPid, self());
false ->
emqx_log:error("Cannot resume ~p which seems already dead!", [SessPid]),
{error, session_died}
end;
resume_session(SessPid) ->
case rpc:call(node(SessPid), emqx_session, resume, [SessPid]) of
ok -> {ok, SessPid};
{badrpc, Reason} ->
{error, Reason};
{error, Reason} ->
{error, Reason}
end.
discard_session(ClientId) -> discard_session(ClientId) ->
discard_session(ClientId, self()).
discard_session(ClientId, ClientPid) ->
lists:foreach(fun({_, SessionPid}) ->
catch emqx_session:discard(SessionPid, ClientPid)
end, lookup_session(ClientId)).
resume_session(ClientId) ->
resume_session(ClientId, self()).
resume_session(ClientId, ClientPid) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
undefined -> ok; [] -> {error, not_found};
Pid -> emqx_session:discard(Pid) [{_, SessionPid}] ->
ok = emqx_session:resume(SessionPid, ClientPid),
{ok, SessionPid};
[{_, SessionPid}|_More] = Sessions ->
emqx_log:error("[SM] More than one session found: ~p", [Sessions]),
ok = emqx_session:resume(SessionPid, ClientPid),
{ok, SessionPid}
end. end.
lookup_session(ClientId) -> lookup_session(ClientId) ->
try ets:lookup_element(session, ClientId, 2) catch error:badarg -> undefined end. {ResL, _} = multicall(?MODULE, lookup, [ClientId]),
lists:append(ResL).
close_session(SessPid) -> close_session(ClientId) ->
emqx_session:close(SessPid). lists:foreach(fun(#session{pid = SessionPid}) ->
emqx_session:close(SessionPid)
end, lookup_session(ClientId)).
with_lock(ClientId, Fun) ->
case emqx_sm_locker:lock(ClientId) of
true -> Result = Fun(),
emqx_sm_locker:unlock(ClientId),
Result;
false -> {error, client_id_unavailable};
{error, Reason} -> {error, Reason}
end.
-spec(register_session(client_id()) -> true).
register_session(ClientId) -> register_session(ClientId) ->
ets:insert(session, {ClientId, self()}). register_session(ClientId, self()).
register_session(ClientId, SessionPid) ->
ets:insert(session, {ClientId, SessionPid}).
unregister_session(ClientId) -> unregister_session(ClientId) ->
unregister_session(ClientId, self()). unregister_session(ClientId, self()).
unregister_session(ClientId, Pid) -> unregister_session(ClientId, SessionPid) ->
case ets:lookup(session, ClientId) of case ets:lookup(session, ClientId) of
[{_, Pid}] -> [Session = {ClientId, SessionPid}] ->
ets:delete_object(session, {ClientId, Pid}); ets:delete(session_attrs, Session),
ets:delete(session_stats, Session),
ets:delete_object(session, Session);
_ -> _ ->
false false
end. end.
dispatch(ClientId, Topic, Msg) -> dispatch(ClientId, Topic, Msg) ->
case lookup_session(ClientId) of case lookup(ClientId) of
Pid when is_pid(Pid) -> [{_, Pid}] ->
Pid ! {dispatch, Topic, Msg}; Pid ! {dispatch, Topic, Msg};
undefined -> [] ->
emqx_hooks:run('message.dropped', [ClientId, Msg]) emqx_hooks:run('message.dropped', [ClientId, Msg])
end. end.
lookup(ClientId) ->
ets:lookup(session, ClientId).
multicall(Mod, Fun, Args) ->
multicall(ekka:nodelist(up), Mod, Fun, Args).
multicall([Node], Mod, Fun, Args) when Node == node() ->
Res = erlang:apply(Mod, Fun, Args), [Res];
multicall(Nodes, Mod, Fun, Args) ->
{ResL, _} = emqx_rpc:multicall(Nodes, Mod, Fun, Args),
ResL.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([StatsFun]) -> init([StatsFun]) ->
{ok, sched_stats(StatsFun, #state{pids = #{}})}.
sched_stats(Fun, State) ->
{ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{stats_fun = StatsFun, stats_timer = TRef}}. State#state{stats = #{func => Fun, timer => TRef}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_log:error("[SM] Unexpected request: ~p", [Req]), emqx_log:error("[SM] Unexpected request: ~p", [Req]),
{reply, ignore, State}. {reply, ignore, State}.
handle_cast({monitor_session, SessionPid, ClientId}, handle_cast({registered, ClientId, SessionPid},
State = #state{monitors = Monitors}) -> State = #state{pids = Pids}) ->
MRef = erlang:monitor(process, SessionPid), _ = erlang:monitor(process, SessionPid),
{noreply, State#state{monitors = maps:put(MRef, ClientId, Monitors)}}; {noreply, State#state{pids = maps:put(SessionPid, ClientId, Pids)}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
emqx_log:error("[SM] Unexpected msg: ~p", [Msg]), emqx_log:error("[SM] Unexpected msg: ~p", [Msg]),
@ -154,14 +160,14 @@ handle_cast(Msg, State) ->
handle_info(stats, State) -> handle_info(stats, State) ->
{noreply, setstats(State), hibernate}; {noreply, setstats(State), hibernate};
handle_info({'DOWN', MRef, process, DownPid, _Reason}, handle_info({'DOWN', _MRef, process, DownPid, _Reason},
State = #state{monitors = Monitors}) -> State = #state{pids = Pids}) ->
case maps:find(MRef, Monitors) of case maps:find(DownPid, Pids) of
{ok, {ClientId, Pid}} -> {ok, ClientId} ->
ets:delete_object(session, {ClientId, Pid}), unregister_session(ClientId, DownPid),
{noreply, setstats(State#state{monitors = maps:remove(MRef, Monitors)})}; {noreply, State#state{pids = maps:remove(DownPid, Pids)}};
error -> error ->
emqx_log:error("session ~p not found", [DownPid]), emqx_log:error("[SM] Session ~p not found", [DownPid]),
{noreply, State} {noreply, State}
end; end;
@ -169,7 +175,7 @@ handle_info(Info, State) ->
emqx_log:error("[SM] Unexpected info: ~p", [Info]), emqx_log:error("[SM] Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State = #state{stats_timer = TRef}) -> terminate(_Reason, _State = #state{stats = #{timer := TRef}}) ->
timer:cancel(TRef). timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -179,6 +185,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
setstats(State = #state{stats_fun = StatsFun}) -> setstats(State = #state{stats = #{func := Fun}}) ->
StatsFun(ets:info(session, size)), State. Fun(ets:info(session, size)), State.

View File

@ -18,16 +18,44 @@
-include("emqx.hrl"). -include("emqx.hrl").
%% Lock/Unlock API based on canal-lock. -export([start_link/0]).
-export([lock/1, unlock/1]).
%% @doc Lock a clientid -export([trans/2, trans/3]).
-spec(lock(client_id()) -> boolean() | {error, term()}).
-export([lock/1, lock/2, unlock/1]).
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() ->
ekka_locker:start_link(?MODULE).
-spec(trans(client_id(), fun(([node()]) -> any())) -> any()).
trans(ClientId, Fun) ->
trans(ClientId, Fun, undefined).
-spec(trans(client_id(), fun(([node()]) -> any()),
ekka_locker:piggyback()) -> any()).
trans(ClientId, Fun, Piggyback) ->
case lock(ClientId, Piggyback) of
{true, Nodes} ->
try Fun(Nodes) after unlock(ClientId) end;
{false, _Nodes} ->
{error, client_id_unavailable}
end.
-spec(lock(client_id()) -> ekka_locker:lock_result()).
lock(ClientId) -> lock(ClientId) ->
rpc:call(ekka_membership:leader(), emqx_locker, lock, [ClientId]). ekka_locker:aquire(?MODULE, ClientId, strategy()).
%% @doc Unlock a clientid -spec(lock(client_id(), ekka_locker:piggyback())
-spec(unlock(client_id()) -> ok). -> ekka_locker:lock_result()).
lock(ClientId, Piggyback) ->
ekka_locker:aquire(?MODULE, ClientId, strategy(), Piggyback).
-spec(unlock(client_id()) -> {boolean(), [node()]}).
unlock(ClientId) -> unlock(ClientId) ->
rpc:call(ekka_membership:leader(), emqx_locker, unlock, [ClientId]). ekka_locker:release(?MODULE, ClientId, strategy()).
-spec(strategy() -> local | one | quorum | all).
strategy() ->
application:get_env(emqx, session_locking_strategy, quorum).

View File

@ -51,7 +51,7 @@ start_link() ->
trace(publish, From, _Msg) when is_atom(From) -> trace(publish, From, _Msg) when is_atom(From) ->
%% Dont' trace '$SYS' publish %% Dont' trace '$SYS' publish
ignore; ignore;
trace(publish, #client{id = ClientId, username = Username}, trace(publish, #client{client_id = ClientId, username = Username},
#message{topic = Topic, payload = Payload}) -> #message{topic = Topic, payload = Payload}) ->
lager:info([{client, ClientId}, {topic, Topic}], lager:info([{client, ClientId}, {topic, Topic}],
"~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);