Skip to content

Commit cc6aca9

Browse files
committed
fix #194, writev multiple msgs, support 6k+ 250kbps clients. 2.0.15.
1 parent 7cf855f commit cc6aca9

File tree

8 files changed

+251
-29
lines changed

8 files changed

+251
-29
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ Supported operating systems and hardware:
242242
* 2013-10-17, Created.<br/>
243243

244244
## History
245+
* v2.0, 2014-11-08, fix [#194](https://github.com/winlinvip/simple-rtmp-server/issues/194), writev multiple msgs, support 6k+ 250kbps clients. 2.0.15.
245246
* v2.0, 2014-11-08, fix [#194](https://github.com/winlinvip/simple-rtmp-server/issues/194), optmized st for timeout recv. pulse to 500ms. 2.0.14.
246247
* v2.0, 2014-11-08, fix [#195](https://github.com/winlinvip/simple-rtmp-server/issues/195), remove the confuse code st_usleep(0). 2.0.13.
247248
* v2.0, 2014-11-08, fix [#191](https://github.com/winlinvip/simple-rtmp-server/issues/191), configure --export-librtmp-project and --export-librtmp-single. 2.0.11.
@@ -449,6 +450,7 @@ Performance benchmark history, on virtual box:
449450
* 2014-11-11, SRS 1.0.5, 2700clients, 85%CPU, 66MB. (1.0 equals 2.0.12)
450451
* 2014-11-12, SRS 2.0.14, 2700clients, 69%CPU, 59MB.
451452
* 2014-11-12, SRS 2.0.14, 3500clients, 95%CPU, 78MB.
453+
* 2014-11-13, SRS 2.0.15, 6000clients, 82%CPU, 203MB.
452454

453455
Latest benchmark(2014-07-12):
454456

trunk/src/app/srs_app_rtmp_conn.cpp

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -525,10 +525,7 @@ int SrsRtmpConn::playing(SrsSource* source)
525525
int64_t starttime = -1;
526526

527527
while (true) {
528-
// collect elapse for pithy print.
529-
pithy_print.elapse();
530-
531-
// to use isolate thread to recv, can improve about 5% performance.
528+
// TODO: to use isolate thread to recv, can improve about 5% performance.
532529
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/196
533530
// read from client.
534531
if (true) {
@@ -539,6 +536,7 @@ int SrsRtmpConn::playing(SrsSource* source)
539536
if (ret == ERROR_SOCKET_TIMEOUT) {
540537
// it's ok, do nothing.
541538
ret = ERROR_SUCCESS;
539+
srs_verbose("recv timeout, ignore. ret=%d", ret);
542540
} else if (ret != ERROR_SUCCESS) {
543541
if (!srs_is_client_gracefully_close(ret)) {
544542
srs_error("recv client control message failed. ret=%d", ret);
@@ -554,6 +552,9 @@ int SrsRtmpConn::playing(SrsSource* source)
554552
}
555553
}
556554

555+
// collect elapse for pithy print.
556+
pithy_print.elapse();
557+
557558
// get messages from consumer.
558559
int count = 0;
559560
if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
@@ -568,22 +569,16 @@ int SrsRtmpConn::playing(SrsSource* source)
568569
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
569570
pithy_print.age(), count,
570571
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
571-
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
572+
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()
573+
);
572574
}
573575

574-
// sendout messages
575-
// @remark, becareful, all msgs must be free explicitly,
576-
// free by send_and_free_message or srs_freep.
577-
for (int i = 0; i < count; i++) {
578-
SrsSharedPtrMessage* msg = msgs.msgs[i];
579-
580-
// the send_message will free the msg,
581-
// so set the msgs[i] to NULL.
582-
msgs.msgs[i] = NULL;
583-
584-
// only when user specifies the duration,
585-
// we start to collect the durations for each message.
586-
if (user_specified_duration_to_stop) {
576+
// only when user specifies the duration,
577+
// we start to collect the durations for each message.
578+
if (user_specified_duration_to_stop) {
579+
for (int i = 0; i < count; i++) {
580+
SrsSharedPtrMessage* msg = msgs.msgs[i];
581+
587582
// foreach msg, collect the duration.
588583
// @remark: never use msg when sent it, for the protocol sdk will free it.
589584
if (starttime < 0 || starttime > msg->header.timestamp) {
@@ -592,12 +587,23 @@ int SrsRtmpConn::playing(SrsSource* source)
592587
duration += msg->header.timestamp - starttime;
593588
starttime = msg->header.timestamp;
594589
}
595-
590+
}
591+
592+
// sendout messages
593+
// @remark, becareful, all msgs must be free explicitly,
594+
// free by send_and_free_message or srs_freep.
595+
if (count > 0) {
596596
// no need to assert msg, for the rtmp will assert it.
597-
if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
598-
srs_error("send message to client failed. ret=%d", ret);
599-
return ret;
600-
}
597+
ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id);
598+
}
599+
for (int i = 0; i < count; i++) {
600+
// the send_message will free the msg,
601+
// so set the msgs[i] to NULL.
602+
msgs.msgs[i] = NULL;
603+
}
604+
if (ret != ERROR_SUCCESS) {
605+
srs_error("send messages to client failed. ret=%d", ret);
606+
return ret;
601607
}
602608

603609
// if duration specified, and exceed it, stop play live.

trunk/src/core/srs_core.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3131
// current release version
3232
#define VERSION_MAJOR 2
3333
#define VERSION_MINOR 0
34-
#define VERSION_REVISION 14
34+
#define VERSION_REVISION 15
3535
// server info.
3636
#define RTMP_SIG_SRS_KEY "SRS"
3737
#define RTMP_SIG_SRS_ROLE "origin/edge server"

trunk/src/kernel/srs_kernel_consts.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
9797
// always use fmt0 as cache.
9898
//#define SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE 5
9999

100+
/**
101+
* for performance issue,
102+
* the iovs cache, @see https://github.com/winlinvip/simple-rtmp-server/issues/194
103+
* iovs cache for multiple messages for each connections.
104+
*/
105+
#define SRS_CONSTS_IOVS_MAX 1024
106+
/**
107+
* for performance issue,
108+
* the c0c3 cache, @see https://github.com/winlinvip/simple-rtmp-server/issues/194
109+
* c0c3 cache for multiple messages for each connections.
110+
*/
111+
#define SRS_CONSTS_C0C3_HEADERS_MAX 4096
112+
100113
///////////////////////////////////////////////////////////
101114
///////////////////////////////////////////////////////////
102115
///////////////////////////////////////////////////////////

trunk/src/rtmp/srs_protocol_rtmp.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,11 @@ int SrsRtmpServer::send_and_free_message(SrsMessage* msg, int stream_id)
771771
return protocol->send_and_free_message(msg, stream_id);
772772
}
773773

774+
int SrsRtmpServer::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
775+
{
776+
return protocol->send_and_free_messages(msgs, nb_msgs, stream_id);
777+
}
778+
774779
int SrsRtmpServer::send_and_free_packet(SrsPacket* packet, int stream_id)
775780
{
776781
return protocol->send_and_free_packet(packet, stream_id);

trunk/src/rtmp/srs_protocol_rtmp.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,15 @@ class SrsRtmpServer
368368
*/
369369
virtual int send_and_free_message(SrsMessage* msg, int stream_id);
370370
/**
371+
* send the RTMP message and always free it.
372+
* user must never free or use the msg after this method,
373+
* for it will always free the msg.
374+
* @param msgs, the msgs to send out, never be NULL.
375+
* @param nb_msgs, the size of msgs to send out.
376+
* @param stream_id, the stream id of packet to send over, 0 for control message.
377+
*/
378+
virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
379+
/**
371380
* send the RTMP packet and always free it.
372381
* user must never free or use the packet after this method,
373382
* for it will always free the packet.

trunk/src/rtmp/srs_protocol_stack.cpp

Lines changed: 167 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2929
#include <srs_core_autofree.hpp>
3030
#include <srs_kernel_utility.hpp>
3131

32+
#include <stdlib.h>
3233
using namespace std;
3334

3435
// when got a messae header, there must be some data,
@@ -404,7 +405,15 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
404405
in_buffer = new SrsBuffer();
405406
skt = io;
406407

407-
in_chunk_size = out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
408+
in_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
409+
out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
410+
411+
nb_out_iovs = SRS_CONSTS_IOVS_MAX;
412+
out_iovs = (iovec*)malloc(sizeof(iovec) * nb_out_iovs);
413+
// each chunk consumers atleast 2 iovs
414+
srs_assert(nb_out_iovs >= 2);
415+
416+
warned_c0c3_caches = false;
408417
}
409418

410419
SrsProtocol::~SrsProtocol()
@@ -421,6 +430,12 @@ SrsProtocol::~SrsProtocol()
421430
}
422431

423432
srs_freep(in_buffer);
433+
434+
// alloc by malloc, use free directly.
435+
if (out_iovs) {
436+
free(out_iovs);
437+
out_iovs = NULL;
438+
}
424439
}
425440

426441
void SrsProtocol::set_recv_timeout(int64_t timeout_us)
@@ -560,7 +575,7 @@ int SrsProtocol::do_send_message(SrsMessage* msg)
560575
// always has header
561576
int nbh = 0;
562577
char* header = NULL;
563-
generate_chunk_header(&msg->header, p == msg->payload, &nbh, &header);
578+
generate_chunk_header(out_c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
564579
srs_assert(nbh > 0);
565580

566581
// header iov
@@ -590,10 +605,130 @@ int SrsProtocol::do_send_message(SrsMessage* msg)
590605
return ret;
591606
}
592607

593-
void SrsProtocol::generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
608+
int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
594609
{
595-
char* cache = out_c0c3_cache;
610+
int ret = ERROR_SUCCESS;
611+
612+
// TODO: FIXME: use cache system instead.
613+
int iov_index = 0;
614+
iovec* iov = out_iovs + iov_index;
615+
616+
int c0c3_cache_index = 0;
617+
char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
618+
619+
// try to send use the c0c3 header cache,
620+
// if cache is consumed, try another loop.
621+
for (int i = 0; i < nb_msgs; i++) {
622+
SrsMessage* msg = msgs[i];
623+
624+
// ignore empty message.
625+
if (!msg->payload || msg->size <= 0) {
626+
srs_info("ignore empty message.");
627+
continue;
628+
}
629+
630+
// we donot use the complex basic header,
631+
// ensure the basic header is 1bytes.
632+
if (msg->header.perfer_cid < 2) {
633+
srs_warn("change the chunk_id=%d to default=%d",
634+
msg->header.perfer_cid, RTMP_CID_ProtocolControl);
635+
msg->header.perfer_cid = RTMP_CID_ProtocolControl;
636+
}
637+
638+
// p set to current write position,
639+
// it's ok when payload is NULL and size is 0.
640+
char* p = msg->payload;
641+
char* pend = msg->payload + msg->size;
642+
643+
// always write the header event payload is empty.
644+
while (p < pend) {
645+
// always has header
646+
int nbh = 0;
647+
char* header = NULL;
648+
generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
649+
srs_assert(nbh > 0);
650+
651+
// header iov
652+
iov[0].iov_base = header;
653+
iov[0].iov_len = nbh;
654+
655+
// payload iov
656+
int payload_size = pend - p;
657+
if (payload_size > out_chunk_size) {
658+
payload_size = out_chunk_size;
659+
}
660+
iov[1].iov_base = p;
661+
iov[1].iov_len = payload_size;
662+
663+
// consume sendout bytes.
664+
p += payload_size;
665+
666+
// realloc the iovs if exceed,
667+
// for we donot know how many messges maybe to send entirely,
668+
// we just alloc the iovs, it's ok.
669+
if (iov_index >= nb_out_iovs - 2) {
670+
nb_out_iovs += SRS_CONSTS_IOVS_MAX;
671+
int realloc_size = sizeof(iovec) * nb_out_iovs;
672+
out_iovs = (iovec*)realloc(out_iovs, realloc_size);
673+
}
674+
675+
// to next pair of iovs
676+
iov_index += 2;
677+
iov = out_iovs + iov_index;
678+
679+
// to next c0c3 header cache
680+
c0c3_cache_index += nbh;
681+
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
682+
683+
// the cache header should never be realloc again,
684+
// for the ptr is set to iovs, so we just warn user to set larger
685+
// and use another loop to send again.
686+
int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
687+
if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
688+
// only warn once for a connection.
689+
if (!warned_c0c3_caches) {
690+
srs_warn("c0c3 cache header too small, recoment to %d",
691+
SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE);
692+
warned_c0c3_caches = true;
693+
}
694+
695+
// when c0c3 cache dry,
696+
// sendout all messages and reset the cache, then send again.
697+
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
698+
srs_error("send with writev failed. ret=%d", ret);
699+
return ret;
700+
}
596701

702+
// reset caches, while these cache ensure
703+
// atleast we can sendout a chunk.
704+
iov_index = 0;
705+
iov = out_iovs + iov_index;
706+
707+
c0c3_cache_index = 0;
708+
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
709+
}
710+
}
711+
}
712+
713+
// maybe the iovs already sendout when c0c3 cache dry,
714+
// so just ignore when no iovs to send.
715+
if (iov_index <= 0) {
716+
return ret;
717+
}
718+
719+
// send by writev
720+
// sendout header and payload by writev.
721+
// decrease the sys invoke count to get higher performance.
722+
if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
723+
srs_error("send with writev failed. ret=%d", ret);
724+
return ret;
725+
}
726+
727+
return ret;
728+
}
729+
730+
void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
731+
{
597732
// to directly set the field.
598733
char* pp = NULL;
599734

@@ -856,6 +991,34 @@ int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
856991
return ret;
857992
}
858993

994+
int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
995+
{
996+
// always not NULL msg.
997+
srs_assert(msgs);
998+
srs_assert(nb_msgs > 0);
999+
1000+
// update the stream id in header.
1001+
for (int i = 0; i < nb_msgs; i++) {
1002+
SrsMessage* msg = msgs[i];
1003+
// we assume that the stream_id in a group must be the same.
1004+
if (msg->header.stream_id == stream_id) {
1005+
break;
1006+
}
1007+
msg->header.stream_id = stream_id;
1008+
}
1009+
1010+
// donot use the auto free to free the msg,
1011+
// for performance issue.
1012+
int ret = do_send_messages(msgs, nb_msgs);
1013+
1014+
for (int i = 0; i < nb_msgs; i++) {
1015+
SrsMessage* msg = msgs[i];
1016+
srs_freep(msg);
1017+
}
1018+
1019+
return ret;
1020+
}
1021+
8591022
int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
8601023
{
8611024
int ret = ERROR_SUCCESS;

0 commit comments

Comments
 (0)