Merge pull request #1558 from emqtt/develop

Version 2.3.7
This commit is contained in:
huangdan 2018-04-21 15:55:26 +08:00 committed by GitHub
commit f5b77f2e7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 78 additions and 41 deletions

View File

@ -9,7 +9,7 @@ dep_gproc = git https://github.com/uwiger/gproc
dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_getopt = git https://github.com/jcomellas/getopt v0.8.2
dep_lager = git https://github.com/basho/lager master dep_lager = git https://github.com/basho/lager master
dep_esockd = git https://github.com/emqtt/esockd v5.2.1 dep_esockd = git https://github.com/emqtt/esockd v5.2.1
dep_ekka = git https://github.com/emqtt/ekka v0.2.2 dep_ekka = git https://github.com/emqtt/ekka v0.2.3
dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.2 dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.2
dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1
dep_lager_syslog = git https://github.com/basho/lager_syslog dep_lager_syslog = git https://github.com/basho/lager_syslog

View File

@ -135,6 +135,12 @@ cluster.autoclean = 5m
## Value: String ## Value: String
## cluster.k8s.app_name = emq ## cluster.k8s.app_name = emq
## Kubernates Namespace
##
## Value: String
## cluster.k8s.namespace = default
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Node Args ## Node Args
##-------------------------------------------------------------------- ##--------------------------------------------------------------------

View File

@ -124,6 +124,10 @@
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "cluster.k8s.namespace", "ekka.cluster_discovery", [
{datatype, string}
]}.
{translation, "ekka.cluster_discovery", fun(Conf) -> {translation, "ekka.cluster_discovery", fun(Conf) ->
Strategy = cuttlefish:conf_get("cluster.discovery", Conf), Strategy = cuttlefish:conf_get("cluster.discovery", Conf),
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
@ -152,7 +156,8 @@
[{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)},
{service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)},
{address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)},
{app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}]; {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)},
{namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}];
(manual) -> (manual) ->
[ ] [ ]
end, end,
@ -434,7 +439,7 @@ end}.
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, ConsoleHandler = {lager_console_backend, [ConsoleLogLevel]},
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
{level, ConsoleLogLevel}, {level, ConsoleLogLevel},
{size, cuttlefish:conf_get("log.console.size", Conf)}, {size, cuttlefish:conf_get("log.console.size", Conf)},

View File

@ -1,6 +1,6 @@
{application,emqttd, {application,emqttd,
[{description,"Erlang MQTT Broker"}, [{description,"Erlang MQTT Broker"},
{vsn,"2.3.6"}, {vsn,"2.3.7"},
{modules,[]}, {modules,[]},
{registered,[emqttd_sup]}, {registered,[emqttd_sup]},
{applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,

View File

@ -93,7 +93,7 @@ format_variable(#mqtt_packet_connect{
format_variable(#mqtt_packet_connack{ack_flags = AckFlags, format_variable(#mqtt_packet_connack{ack_flags = AckFlags,
return_code = ReturnCode}) -> return_code = ReturnCode}) ->
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]); io_lib:format("AckFlags=~p, ReturnCode=~p", [AckFlags, ReturnCode]);
format_variable(#mqtt_packet_publish{topic_name = TopicName, format_variable(#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId}) -> packet_id = PacketId}) ->

View File

@ -162,11 +162,12 @@ received(Packet = ?PACKET(Type), State = #proto_state{stats_data = Stats}) ->
subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId, subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId,
username = Username, username = Username,
session = Session}) -> session = Session,
mountpoint = MountPoint}) ->
TopicTable = parse_topic_table(RawTopicTable), TopicTable = parse_topic_table(RawTopicTable),
case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
{ok, TopicTable1} -> {ok, TopicTable1} ->
emqttd_session:subscribe(Session, TopicTable1); emqttd_session:subscribe(Session, mount(MountPoint, TopicTable1));
{stop, _} -> {stop, _} ->
ok ok
end, end,
@ -174,10 +175,11 @@ subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId,
unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId, unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId,
username = Username, username = Username,
session = Session}) -> session = Session,
mountpoint = MountPoint}) ->
case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
{ok, TopicTable} -> {ok, TopicTable} ->
emqttd_session:unsubscribe(Session, TopicTable); emqttd_session:unsubscribe(Session, mount(MountPoint, TopicTable));
{stop, _} -> {stop, _} ->
ok ok
end, end,
@ -228,7 +230,8 @@ process(?CONNECT_PACKET(Var), State0) ->
%% ACCEPT %% ACCEPT
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
{error, Error} -> {error, Error} ->
{stop, {shutdown, Error}, State2} ?LOG(error, "Username '~s' login failed for ~p", [Username, Error], State2),
{?CONNACK_SERVER, false, State2}
end; end;
{error, Reason}-> {error, Reason}->
?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
@ -593,4 +596,3 @@ unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
{MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0}; {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0};
_ -> Msg _ -> Msg
end. end.

View File

@ -92,7 +92,7 @@ publish(Msg = #mqtt_message{from = From}) ->
{ok, Msg1 = #mqtt_message{topic = Topic}} -> {ok, Msg1 = #mqtt_message{topic = Topic}} ->
emqttd_pubsub:publish(Topic, Msg1); emqttd_pubsub:publish(Topic, Msg1);
{stop, Msg1} -> {stop, Msg1} ->
lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]), lager:info("Stop publishing: ~s", [emqttd_message:format(Msg1)]),
ignore ignore
end. end.

View File

@ -720,7 +720,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
redeliver(Msg = #mqtt_message{qos = QoS}, State) -> redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State); deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS0 -> false; true -> true end}, State);
redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
Pid ! {redeliver, {?PUBREL, PacketId}}. Pid ! {redeliver, {?PUBREL, PacketId}}.
@ -853,4 +853,3 @@ shutdown(Reason, State) ->
gc(State) -> gc(State) ->
emqttd_gc:maybe_force_gc(#state.force_gc_count, State). emqttd_gc:maybe_force_gc(#state.force_gc_count, State).

View File

@ -183,15 +183,16 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of case dict:find(MRef, State#state.monitors) of
{ok, ClientId} -> {ok, ClientId} ->
NewState =
case mnesia:dirty_read({mqtt_session, ClientId}) of case mnesia:dirty_read({mqtt_session, ClientId}) of
[] -> [] -> State;
ok;
[Sess = #mqtt_session{sess_pid = DownPid}] -> [Sess = #mqtt_session{sess_pid = DownPid}] ->
mnesia:dirty_delete_object(Sess); mnesia:dirty_delete_object(Sess),
erase_monitor(MRef, State);
[_Sess] -> [_Sess] ->
ok State
end, end,
{noreply, erase_monitor(MRef, State), hibernate}; {noreply, NewState, hibernate};
error -> error ->
lager:error("MRef of session ~p not found", [DownPid]), lager:error("MRef of session ~p not found", [DownPid]),
{noreply, State} {noreply, State}
@ -256,6 +257,7 @@ resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}
{ok, SessPid}; {ok, SessPid};
false -> false ->
?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session), ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session),
remove_session(Session),
{error, session_died} {error, session_died}
end; end;
@ -305,4 +307,3 @@ monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
erase_monitor(MRef, State = #state{monitors = Monitors}) -> erase_monitor(MRef, State = #state{monitors = Monitors}) ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
State#state{monitors = dict:erase(MRef, Monitors)}. State#state{monitors = dict:erase(MRef, Monitors)}.

View File

@ -143,7 +143,7 @@ setstat(Stat, Val) ->
ets:update_element(?STATS_TAB, Stat, {2, Val}). ets:update_element(?STATS_TAB, Stat, {2, Val}).
%% @doc Set stats with max %% @doc Set stats with max
-spec(setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean()). -spec(setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> ok).
setstats(Stat, MaxStat, Val) -> setstats(Stat, MaxStat, Val) ->
gen_server:cast(?MODULE, {setstats, Stat, MaxStat, Val}). gen_server:cast(?MODULE, {setstats, Stat, MaxStat, Val}).

View File

@ -156,9 +156,9 @@ microsecs() ->
(Mega * 1000000 + Sec) * 1000000 + Micro. (Mega * 1000000 + Sec) * 1000000 + Micro.
loads() -> loads() ->
[{load1, ftos(cpu_sup:avg1()/256)}, [{load1, ftos(avg1()/256)},
{load5, ftos(cpu_sup:avg5()/256)}, {load5, ftos(avg5()/256)},
{load15, ftos(cpu_sup:avg15()/256)}]. {load15, ftos(avg15()/256)}].
get_system_info() -> get_system_info() ->
[{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO]. [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO].
@ -427,3 +427,27 @@ mapping([{owner, V}|Entries], Acc) when is_pid(V) ->
mapping([{Key, Value}|Entries], Acc) -> mapping([{Key, Value}|Entries], Acc) ->
mapping(Entries, [{Key, Value}|Acc]). mapping(Entries, [{Key, Value}|Acc]).
avg1() ->
case cpu_sup:avg1() of
SystemLoad when is_integer(SystemLoad) ->
SystemLoad;
{error, Reason} ->
lager:error("Get the average system load in the last minute fail for ~p~n", [Reason]),
0.00
end.
avg5() ->
case cpu_sup:avg5() of
SystemLoad when is_integer(SystemLoad) ->
SystemLoad;
{error, Reason} ->
lager:error("Get the average system load in the last 5 minutes fail for ~p~n", [Reason]),
0.00
end.
avg15() ->
case cpu_sup:avg15() of
SystemLoad when is_integer(SystemLoad) ->
SystemLoad;
{error, Reason} ->
lager:error("Get the average system load in the last 15 minutes fail for ~p~n", [Reason]),
0.00
end.