Skip to content

Commit 5589b13

Browse files
committed
for bug #241, support mr(merged-read) config and reload. 2.0.52.
1 parent 57f844b commit 5589b13

18 files changed

+293
-48
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ Supported operating systems and hardware:
485485
* 2013-10-17, Created.<br/>
486486

487487
## History
488+
* v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52.
488489
* v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50
489490
* v2.0, 2014-12-04, fix [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), improve about 15% performance for fast buffer. 2.0.49
490491
* v2.0, 2014-12-03, fix [#244](https://github.com/winlinvip/simple-rtmp-server/issues/244), conn thread use cond to wait for recv thread error. 2.0.47.

trunk/conf/full.conf

100644100755
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,26 @@ http_stream {
142142
vhost __defaultVhost__ {
143143
}
144144

145+
# the MR(merged-read) setting for publisher.
146+
vhost mr.srs.com {
147+
# about MR, read https://github.com/winlinvip/simple-rtmp-server/issues/241
148+
mr {
149+
# whether enable the MR(merged-read)
150+
# default: off
151+
enabled on;
152+
# the latency in ms for MR(merged-read),
153+
# the performance+ when latency+, and memory+,
154+
# memory(buffer) = latency * kbps / 8
155+
# for example, latency=500ms, kbps=3000kbps, each publish connection will consume
156+
# memory = 500 * 3000 / 8 = 187500B = 183KB
157+
# when there are 2500 publisher, the total memory of SRS atleast:
158+
# 183KB * 2500 = 446MB
159+
# the value recomment is [300, 2000]
160+
# default: 500
161+
latency 500;
162+
}
163+
}
164+
145165
# vhost for edge, edge and origin is the same vhost
146166
vhost same.edge.srs.com {
147167
# the mode of vhost, local or remote.

trunk/src/app/srs_app_config.cpp

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
816816
return ret;
817817
}
818818
}
819-
srs_trace("vhost %s reload hls success.", vhost.c_str());
819+
srs_trace("vhost %s reload hlsdvrsuccess.", vhost.c_str());
820+
}
821+
// mr, only one per vhost
822+
if (!srs_directive_equals(new_vhost->get("mr"), old_vhost->get("mr"))) {
823+
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
824+
ISrsReloadHandler* subscribe = *it;
825+
if ((ret = subscribe->on_reload_vhost_mr(vhost)) != ERROR_SUCCESS) {
826+
srs_error("vhost %s notify subscribes mr failed. ret=%d", vhost.c_str(), ret);
827+
return ret;
828+
}
829+
}
830+
srs_trace("vhost %s reload mr success.", vhost.c_str());
820831
}
821832
// http, only one per vhost.
822833
if (!srs_directive_equals(new_vhost->get("http"), old_vhost->get("http"))) {
@@ -1316,6 +1327,7 @@ int SrsConfig::check_config()
13161327
&& n != "time_jitter"
13171328
&& n != "atc" && n != "atc_auto"
13181329
&& n != "debug_srs_upnode"
1330+
&& n != "mr"
13191331
) {
13201332
ret = ERROR_SYSTEM_CONFIG_INVALID;
13211333
srs_error("unsupported vhost directive %s, ret=%d", n.c_str(), ret);
@@ -1333,6 +1345,16 @@ int SrsConfig::check_config()
13331345
return ret;
13341346
}
13351347
}
1348+
} else if (n == "mr") {
1349+
for (int j = 0; j < (int)conf->directives.size(); j++) {
1350+
string m = conf->at(j)->name.c_str();
1351+
if (m != "enabled" && m != "latency"
1352+
) {
1353+
ret = ERROR_SYSTEM_CONFIG_INVALID;
1354+
srs_error("unsupported vhost mr directive %s, ret=%d", m.c_str(), ret);
1355+
return ret;
1356+
}
1357+
}
13361358
} else if (n == "ingest") {
13371359
for (int j = 0; j < (int)conf->directives.size(); j++) {
13381360
string m = conf->at(j)->name.c_str();
@@ -2078,6 +2100,50 @@ int SrsConfig::get_chunk_size(string vhost)
20782100
return ::atoi(conf->arg0().c_str());
20792101
}
20802102

2103+
bool SrsConfig::get_mr_enabled(string vhost)
2104+
{
2105+
2106+
SrsConfDirective* conf = get_vhost(vhost);
2107+
2108+
if (!conf) {
2109+
return SRS_CONSTS_RTMP_MR;
2110+
}
2111+
2112+
conf = conf->get("mr");
2113+
if (!conf) {
2114+
return SRS_CONSTS_RTMP_MR;
2115+
}
2116+
2117+
conf = conf->get("enabled");
2118+
if (!conf || conf->arg0() != "on") {
2119+
return SRS_CONSTS_RTMP_MR;
2120+
}
2121+
2122+
return true;
2123+
}
2124+
2125+
int SrsConfig::get_mr_sleep_ms(string vhost)
2126+
{
2127+
2128+
SrsConfDirective* conf = get_vhost(vhost);
2129+
2130+
if (!conf) {
2131+
return SRS_CONSTS_RTMP_MR_SLEEP;
2132+
}
2133+
2134+
conf = conf->get("mr");
2135+
if (!conf) {
2136+
return SRS_CONSTS_RTMP_MR_SLEEP;
2137+
}
2138+
2139+
conf = conf->get("latency");
2140+
if (!conf || conf->arg0().empty()) {
2141+
return SRS_CONSTS_RTMP_MR_SLEEP;
2142+
}
2143+
2144+
return ::atoi(conf->arg0().c_str());
2145+
}
2146+
20812147
int SrsConfig::get_global_chunk_size()
20822148
{
20832149
SrsConfDirective* conf = root->get("chunk_size");

trunk/src/app/srs_app_config.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,16 @@ class SrsConfig
530530
* @remark, default 60000.
531531
*/
532532
virtual int get_chunk_size(std::string vhost);
533+
/**
534+
* whether mr is enabled for vhost.
535+
* @param vhost, the vhost to get the mr.
536+
*/
537+
virtual bool get_mr_enabled(std::string vhost);
538+
/**
539+
* get the mr sleep time in ms for vhost.
540+
* @param vhost, the vhost to get the mr sleep time.
541+
*/
542+
virtual int get_mr_sleep_ms(std::string vhost);
533543
private:
534544
/**
535545
* get the global chunk size.

trunk/src/app/srs_app_recv_thread.cpp

Lines changed: 90 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2929
#include <srs_protocol_buffer.hpp>
3030
#include <srs_kernel_utility.hpp>
3131
#include <srs_core_performance.hpp>
32+
#include <srs_app_config.hpp>
33+
34+
using namespace std;
3235

3336
ISrsMessageHandler::ISrsMessageHandler()
3437
{
@@ -221,11 +224,13 @@ void SrsQueueRecvThread::on_thread_stop()
221224
}
222225

223226
SrsPublishRecvThread::SrsPublishRecvThread(
224-
SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms,
227+
SrsRtmpServer* rtmp_sdk,
228+
SrsRequest* _req, int mr_sock_fd, int timeout_ms,
225229
SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
226230
): trd(this, rtmp_sdk, timeout_ms)
227231
{
228232
rtmp = rtmp_sdk;
233+
229234
_conn = conn;
230235
_source = source;
231236
_is_fmle = is_fmle;
@@ -234,12 +239,22 @@ SrsPublishRecvThread::SrsPublishRecvThread(
234239
recv_error_code = ERROR_SUCCESS;
235240
_nb_msgs = 0;
236241
error = st_cond_new();
242+
243+
req = _req;
244+
mr_fd = mr_sock_fd;
237245

238-
mr_fd = fd;
246+
// the mr settings,
247+
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
248+
mr = _srs_config->get_mr_enabled(req->vhost);
249+
mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
250+
251+
_srs_config->subscribe(this);
239252
}
240253

241254
SrsPublishRecvThread::~SrsPublishRecvThread()
242255
{
256+
_srs_config->unsubscribe(this);
257+
243258
trd.stop();
244259
st_cond_destroy(error);
245260
}
@@ -282,20 +297,8 @@ void SrsPublishRecvThread::on_thread_start()
282297
// for the main thread never send message.
283298

284299
#ifdef SRS_PERF_MERGED_READ
285-
// socket recv buffer, system will double it.
286-
int nb_rbuf = SRS_MR_SOCKET_BUFFER / 2;
287-
socklen_t sock_buf_size = sizeof(int);
288-
if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {
289-
srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);
290-
}
291-
getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
292-
293-
srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",
294-
SRS_MR_SOCKET_BUFFER, nb_rbuf, SRS_MR_MAX_SLEEP_MS, SRS_MR_SMALL_BYTES);
295-
296-
// enable the merge read
297-
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
298-
rtmp->set_merge_read(true, this);
300+
// for mr.
301+
update_buffer(mr, mr_sleep);
299302
#endif
300303
}
301304

@@ -349,7 +352,11 @@ void SrsPublishRecvThread::on_recv_error(int ret)
349352
#ifdef SRS_PERF_MERGED_READ
350353
void SrsPublishRecvThread::on_read(ssize_t nread)
351354
{
352-
if (nread < 0 || SRS_MR_MAX_SLEEP_MS <= 0) {
355+
if (!mr) {
356+
return;
357+
}
358+
359+
if (nread < 0 || mr_sleep <= 0) {
353360
return;
354361
}
355362

@@ -360,7 +367,72 @@ void SrsPublishRecvThread::on_read(ssize_t nread)
360367
* @see https://github.com/winlinvip/simple-rtmp-server/issues/241
361368
*/
362369
if (nread < SRS_MR_SMALL_BYTES) {
363-
st_usleep(SRS_MR_MAX_SLEEP_MS * 1000);
370+
st_usleep(mr_sleep * 1000);
364371
}
365372
}
366373
#endif
374+
375+
int SrsPublishRecvThread::on_reload_vhost_mr(string vhost)
376+
{
377+
int ret = ERROR_SUCCESS;
378+
379+
// the mr settings,
380+
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
381+
bool mr_enabled = _srs_config->get_mr_enabled(req->vhost);
382+
int sleep_ms = _srs_config->get_mr_sleep_ms(req->vhost);
383+
update_buffer(mr_enabled, sleep_ms);
384+
385+
return ret;
386+
}
387+
388+
void SrsPublishRecvThread::update_buffer(bool mr_enabled, int sleep_ms)
389+
{
390+
// TODO: FIXME: refine it.
391+
392+
#ifdef SRS_PERF_MERGED_READ
393+
// previous enabled mr, update the buffer.
394+
if (mr && mr_sleep != sleep_ms) {
395+
// the underlayer api will set to SRS_MR_SOCKET_BUFFER bytes.
396+
// 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
397+
// 128KB=131072, 256KB=262144, 512KB=524288
398+
// the buffer should set to SRS_MR_MAX_SLEEP_MS*kbps/8,
399+
// for example, your system delivery stream in 1000kbps,
400+
// sleep 800ms for small bytes, the buffer should set to:
401+
// 800*1000/8=100000B(about 128KB).
402+
// 2000*3000/8=750000B(about 732KB).
403+
int kbps = 3000;
404+
int socket_buffer_size = mr_sleep * kbps / 8;
405+
406+
// socket recv buffer, system will double it.
407+
int nb_rbuf = socket_buffer_size / 2;
408+
socklen_t sock_buf_size = sizeof(int);
409+
if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) {
410+
srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf);
411+
}
412+
getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
413+
414+
srs_trace("merged read sockbuf=%d, actual=%d, sleep %d when nread<=%d",
415+
socket_buffer_size, nb_rbuf, mr_sleep, SRS_MR_SMALL_BYTES);
416+
417+
rtmp->set_recv_buffer(nb_rbuf);
418+
}
419+
#endif
420+
421+
// update to new state
422+
mr = mr_enabled;
423+
mr_sleep = sleep_ms;
424+
425+
#ifdef SRS_PERF_MERGED_READ
426+
// apply new state.
427+
if (mr) {
428+
// enable the merge read
429+
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
430+
rtmp->set_merge_read(true, this);
431+
} else {
432+
// disable the merge read
433+
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
434+
rtmp->set_merge_read(false, NULL);
435+
}
436+
#endif
437+
}
438+

trunk/src/app/srs_app_recv_thread.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3535
#include <srs_app_thread.hpp>
3636
#include <srs_protocol_buffer.hpp>
3737
#include <srs_core_performance.hpp>
38+
#include <srs_app_reload.hpp>
3839

3940
class SrsRtmpServer;
4041
class SrsMessage;
4142
class SrsRtmpConn;
4243
class SrsSource;
44+
class SrsRequest;
4345

4446
/**
4547
* for the recv thread to handle the message.
@@ -138,15 +140,19 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler
138140
#ifdef SRS_PERF_MERGED_READ
139141
, virtual public IMergeReadHandler
140142
#endif
143+
, virtual public ISrsReloadHandler
141144
{
142145
private:
143146
SrsRecvThread trd;
144147
SrsRtmpServer* rtmp;
148+
SrsRequest* req;
145149
// the msgs already got.
146150
int64_t _nb_msgs;
147151
// for mr(merged read),
148152
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
153+
bool mr;
149154
int mr_fd;
155+
int mr_sleep;
150156
// the recv thread error code.
151157
int recv_error_code;
152158
SrsRtmpConn* _conn;
@@ -158,7 +164,8 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler
158164
// @see https://github.com/winlinvip/simple-rtmp-server/issues/244
159165
st_cond_t error;
160166
public:
161-
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms,
167+
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk,
168+
SrsRequest* _req, int mr_sock_fd, int timeout_ms,
162169
SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge);
163170
virtual ~SrsPublishRecvThread();
164171
public:
@@ -183,6 +190,11 @@ class SrsPublishRecvThread : virtual public ISrsMessageHandler
183190
#ifdef SRS_PERF_MERGED_READ
184191
virtual void on_read(ssize_t nread);
185192
#endif
193+
// interface ISrsReloadHandler
194+
public:
195+
virtual int on_reload_vhost_mr(std::string vhost);
196+
private:
197+
virtual void update_buffer(bool mr_enabled, int sleep_ms);
186198
};
187199

188200
#endif

trunk/src/app/srs_app_reload.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ int ISrsReloadHandler::on_reload_vhost_dvr(string /*vhost*/)
140140
return ERROR_SUCCESS;
141141
}
142142

143+
int ISrsReloadHandler::on_reload_vhost_mr(string /*vhost*/)
144+
{
145+
return ERROR_SUCCESS;
146+
}
147+
143148
int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/)
144149
{
145150
return ERROR_SUCCESS;

trunk/src/app/srs_app_reload.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class ISrsReloadHandler
6565
virtual int on_reload_vhost_forward(std::string vhost);
6666
virtual int on_reload_vhost_hls(std::string vhost);
6767
virtual int on_reload_vhost_dvr(std::string vhost);
68+
virtual int on_reload_vhost_mr(std::string vhost);
6869
virtual int on_reload_vhost_transcode(std::string vhost);
6970
virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
7071
virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);

0 commit comments

Comments
 (0)