Skip to content

SRT: Support Coroutine Native SRT over ST. #3010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
aff6102
Refine SRT code, with StateThread adpater
xiaozhihong Apr 15, 2022
193fcbd
SRT: Support debugging with CLion.
winlinvip Apr 16, 2022
a499a8c
SRT: Eliminate unused files for SRT.
winlinvip Apr 16, 2022
85e9581
SRT: check srt_connect return value
xiaozhihong Apr 16, 2022
7352bfe
SRT: support rtmp to srt
xiaozhihong Apr 20, 2022
5d33eec
SRT: fix utest failed
xiaozhihong Apr 22, 2022
8cea168
SRT: Fix cmake bug, quit if error.
winlinvip May 23, 2022
05d3d96
SRT: Update full.conf for new configs.
winlinvip May 23, 2022
4ce091e
SRT: Update full.conf for review.
winlinvip May 23, 2022
86d48c4
SRT: Add srt vhost section to full.conf.
winlinvip May 25, 2022
738e110
SRT: Refine parse SRT listen ip and port.
winlinvip May 28, 2022
bc59917
SRT: Refine the schedule resolution to 10ms if idle.
winlinvip May 28, 2022
f5101ae
SRT: Initialize SRT eventloop in adapter.
winlinvip May 28, 2022
16063d1
SRT: Refine get_srt_poller to poller.
winlinvip May 28, 2022
a7725f6
SRT: Refine the SRT socket code.
winlinvip May 28, 2022
14fb187
SRT: Add SRT option annotation in full.conf
xiaozhihong May 30, 2022
8558fb9
SRT: Tsbpdmode default on
xiaozhihong May 30, 2022
a3e0a3c
SRT: refine int to srs_utime_t in srt timeout config
xiaozhihong May 30, 2022
c096dfa
SRT: add option peer_idle_timeout in full.conf
xiaozhihong May 30, 2022
40725ab
SRT: use srs code style in function defination
xiaozhihong May 30, 2022
b217eaf
SRT: remove mix_correct
xiaozhihong May 30, 2022
c4abac8
SRT: remove rtmp_to_srt
xiaozhihong May 30, 2022
563cb81
SRT: Rename srs_service_st_srt to srs_protocol_srt
winlinvip Jun 7, 2022
083b8ce
SRT: Wrap SRT stat object.
winlinvip Jun 7, 2022
031ea96
SRT: Extract ISrsSrtPoller to hide SRT_EPOLL_EVENT
winlinvip Jun 7, 2022
248ce93
SRT: Hide srt implements from API.
winlinvip Jun 7, 2022
99a6d72
SRT: Undo extract of fetch_or_create_source and change mode_ to local.
winlinvip Jun 8, 2022
b2b6087
SRT: Refine packet error handler.
winlinvip Jun 8, 2022
a89c725
SRT: Add todo for coroutine yield.
winlinvip Jun 9, 2022
5933942
SRT: Merge develop, rename bridger to bridge.
winlinvip Jun 9, 2022
2668fec
SRT: Merge develop, rename filename in protocol.
winlinvip Jun 9, 2022
85591e9
SRT: Merge develop, prefix for protocol files.
winlinvip Jun 9, 2022
d57fd6b
Merge branch 'develop' into feature/srt
winlinvip Jun 9, 2022
93d0cfb
SRT: Merge develop, fix build fail.
winlinvip Jun 9, 2022
4c32b52
SRT: Refine code, remove SrsSrtListenerType
winlinvip Jun 10, 2022
424f5ec
SRT: Change bridges to bridge.
winlinvip Jun 10, 2022
f933e7f
SRT: move accept log into srt conn cycle
xiaozhihong Jun 12, 2022
9d97402
SRT: readd mix_correct, compatible old srt conf
xiaozhihong Jun 12, 2022
b7aad03
SRT: add srs_core_lock, support scope lock guard
xiaozhihong Jun 12, 2022
edc291a
SRT: add srt log handle, srs log supoort multithread
xiaozhihong Jun 12, 2022
d7dcf6d
SRT: Refine the lock for log.
winlinvip Jun 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SRT: Merge develop, rename bridger to bridge.
  • Loading branch information
winlinvip committed Jun 9, 2022
commit 59339420a8fe1bf689e670308107e55381afcef6
50 changes: 25 additions & 25 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ ISrsRtcSourceEventHandler::~ISrsRtcSourceEventHandler()
{
}

ISrsRtcSourceBridger::ISrsRtcSourceBridger(SrsBridgeDestType type) : ISrsBridge(type)
ISrsRtcSourceBridge::ISrsRtcSourceBridge(SrsBridgeDestType type) : ISrsBridge(type)
{
}

Expand All @@ -343,13 +343,14 @@ SrsRtcSource::~SrsRtcSource()
// for all consumers are auto free.
consumers.clear();

srs_freep(req);
for (vector<ISrsRtcSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsRtcSourceBridger* bridge = *iter;
for (vector<ISrsRtcSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsRtcSourceBridge* bridge = *iter;
srs_freep(bridge);
}
bridgers_.clear();
bridges_.clear();

srs_freep(stream_desc_);
srs_freep(req);
}

srs_error_t SrsRtcSource::initialize(SrsRequest* r)
Expand Down Expand Up @@ -462,16 +463,16 @@ SrsContextId SrsRtcSource::pre_source_id()

void SrsRtcSource::set_bridge(ISrsRtcSourceBridge *bridge)
{
for (vector<ISrsRtcSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsRtcSourceBridger* b = *iter;
if (b->get_type() == bridger->get_type()) {
for (vector<ISrsRtcSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsRtcSourceBridge* b = *iter;
if (b->get_type() == bridge->get_type()) {
srs_freep(b);
*iter = bridger;
*iter = bridge;
return;
}
}

bridgers_.push_back(bridger);
bridges_.push_back(bridge);
}

srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
Expand Down Expand Up @@ -544,11 +545,11 @@ srs_error_t SrsRtcSource::on_publish()
}

// If bridge to other source, handle event and start timer to request PLI.
if (! bridgers_.empty()) {
for (vector<ISrsRtcSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsRtcSourceBridger* bridge = *iter;
if (! bridges_.empty()) {
for (vector<ISrsRtcSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsRtcSourceBridge* bridge = *iter;
if ((err = bridge->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger on publish");
return srs_error_wrap(err, "bridge on publish");
}
}

Expand Down Expand Up @@ -587,17 +588,17 @@ void SrsRtcSource::on_unpublish()
h->on_unpublish();
}

//free bridger resource
if (! bridgers_.empty()) {
//free bridge resource
if (! bridges_.empty()) {
// For SrsRtcSource::on_timer()
_srs_hybrid->timer100ms()->unsubscribe(this);

for (vector<ISrsRtcSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsRtcSourceBridger* bridge = *iter;
for (vector<ISrsRtcSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsRtcSourceBridge* bridge = *iter;
bridge->on_unpublish();
srs_freep(bridge);
}
bridgers_.clear();
bridges_.clear();
}

SrsStatistic* stat = SrsStatistic::instance();
Expand Down Expand Up @@ -647,10 +648,10 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket* pkt)
}
}

for (vector<ISrsRtcSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsRtcSourceBridger* bridge = *iter;
for (vector<ISrsRtcSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsRtcSourceBridge* bridge = *iter;
if ((err = bridge->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "bridger consume message");
return srs_error_wrap(err, "bridge consume message");
}
}

Expand Down Expand Up @@ -724,8 +725,7 @@ srs_error_t SrsRtcSource::on_timer(srs_utime_t interval)
}

#ifdef SRS_FFMPEG_FIT
SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source)
: ISrsLiveSourceBridger(SrsBridgeDestTypeRTC)
SrsRtcFromRtmpBridge::SrsRtcFromRtmpBridge(SrsRtcSource* source) : ISrsLiveSourceBridge(SrsBridgeDestTypeRTC)
{
req = NULL;
source_ = source;
Expand Down Expand Up @@ -1292,7 +1292,7 @@ srs_error_t SrsRtcFromRtmpBridge::consume_packets(vector<SrsRtpPacket*>& pkts)
return err;
}

SrsRtmpFromRtcBridger::SrsRtmpFromRtcBridger(SrsLiveSource *src) : ISrsRtcSourceBridger(SrsBridgeDestTypeRtmp)
SrsRtmpFromRtcBridge::SrsRtmpFromRtcBridge(SrsLiveSource *src) : ISrsRtcSourceBridge(SrsBridgeDestTypeRtmp)
{
source_ = src;
codec_ = NULL;
Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ class ISrsRtcSourceEventHandler
};

// SrsRtcSource bridge to SrsLiveSource
class ISrsRtcSourceBridger : public ISrsBridge
class ISrsRtcSourceBridge : public ISrsBridge
{
public:
ISrsRtcSourceBridger(SrsBridgeDestType type);
virtual ~ISrsRtcSourceBridger();
ISrsRtcSourceBridge(SrsBridgeDestType type);
virtual ~ISrsRtcSourceBridge();
public:
virtual srs_error_t on_publish() = 0;
virtual srs_error_t on_rtp(SrsRtpPacket *pkt) = 0;
Expand All @@ -170,8 +170,8 @@ class SrsRtcSource : public ISrsFastTimer
ISrsRtcPublishStream* publish_stream_;
// Steam description for this steam.
SrsRtcSourceDescription* stream_desc_;
// The Source bridger, bridger stream to other source.
std::vector<ISrsRtcSourceBridger*> bridgers_;
// The Source bridge, bridge stream to other source.
std::vector<ISrsRtcSourceBridge*> bridges_;
private:
// To delivery stream to clients.
std::vector<SrsRtcConsumer*> consumers;
Expand Down
44 changes: 22 additions & 22 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1923,7 +1923,7 @@ SrsBridgeDestType ISrsBridge::get_type() const
return type_;
}

ISrsLiveSourceBridger::ISrsLiveSourceBridger(SrsBridgeDestType type) : ISrsBridge(type)
ISrsLiveSourceBridge::ISrsLiveSourceBridge(SrsBridgeDestType type) : ISrsBridge(type)
{
}

Expand All @@ -1942,7 +1942,7 @@ SrsLiveSource::SrsLiveSource()
die_at = 0;

handler = NULL;

play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
gop_cache = new SrsGopCache();
Expand Down Expand Up @@ -1973,11 +1973,11 @@ SrsLiveSource::~SrsLiveSource()
srs_freep(gop_cache);

srs_freep(req);
for (vector<ISrsLiveSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsLiveSourceBridger* bridge = *iter;
for (vector<ISrsLiveSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsLiveSourceBridge* bridge = *iter;
srs_freep(bridge);
}
bridgers_.clear();
bridges_.clear();
}

void SrsLiveSource::dispose()
Expand Down Expand Up @@ -2055,16 +2055,16 @@ srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h)

void SrsLiveSource::set_bridge(ISrsLiveSourceBridge* v)
{
for (vector<ISrsLiveSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsLiveSourceBridger* bridge = *iter;
for (vector<ISrsLiveSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsLiveSourceBridge* bridge = *iter;
if (v->get_type() == bridge->get_type()) {
srs_freep(bridge);
*iter = v;
return;
}
}

bridgers_.push_back(v);
bridges_.push_back(v);
}

srs_error_t SrsLiveSource::on_reload_vhost_play(string vhost)
Expand Down Expand Up @@ -2316,11 +2316,11 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
return srs_error_wrap(err, "consume audio");
}

// For bridger to consume the message.
for (vector<ISrsLiveSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsLiveSourceBridger* bridge = *iter;
// For bridge to consume the message.
for (vector<ISrsLiveSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsLiveSourceBridge* bridge = *iter;
if ((err = bridge->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume audio");
return srs_error_wrap(err, "bridge consume audio");
}
}

Expand Down Expand Up @@ -2449,11 +2449,11 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)
return srs_error_wrap(err, "hub consume video");
}

// For bridger to consume the message.
for (vector<ISrsLiveSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsLiveSourceBridger* bridge = *iter;
// For bridge to consume the message.
for (vector<ISrsLiveSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsLiveSourceBridge* bridge = *iter;
if ((err = bridge->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume video");
return srs_error_wrap(err, "bridge consume video");
}
}

Expand Down Expand Up @@ -2617,10 +2617,10 @@ srs_error_t SrsLiveSource::on_publish()
return srs_error_wrap(err, "handle publish");
}

for (vector<ISrsLiveSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsLiveSourceBridger* bridge = *iter;
for (vector<ISrsLiveSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsLiveSourceBridge* bridge = *iter;
if ((err = bridge->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger publish");
return srs_error_wrap(err, "bridge publish");
}
}

Expand Down Expand Up @@ -2665,12 +2665,12 @@ void SrsLiveSource::on_unpublish()

handler->on_unpublish(this, req);

for (vector<ISrsLiveSourceBridger*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
ISrsLiveSourceBridger* bridge = *iter;
for (vector<ISrsLiveSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsLiveSourceBridge* bridge = *iter;
bridge->on_unpublish();
srs_freep(bridge);
}
bridgers_.clear();
bridges_.clear();

// no consumer, stream is die.
if (consumers.empty()) {
Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,11 @@ class ISrsBridge {
};

// For RTMP2RTC, bridge SrsLiveSource to SrsRtcSource
class ISrsLiveSourceBridger : public ISrsBridge
class ISrsLiveSourceBridge : public ISrsBridge
{
public:
ISrsLiveSourceBridger(SrsBridgeDestType type);
virtual ~ISrsLiveSourceBridger();
ISrsLiveSourceBridge(SrsBridgeDestType type);
virtual ~ISrsLiveSourceBridge();
public:
virtual srs_error_t on_publish() = 0;
virtual srs_error_t on_audio(SrsSharedPtrMessage* audio) = 0;
Expand Down Expand Up @@ -531,8 +531,8 @@ class SrsLiveSource : public ISrsReloadHandler
int64_t last_packet_time;
// The event handler.
ISrsLiveSourceHandler* handler;
// The source bridger for other source.
std::vector<ISrsLiveSourceBridger*> bridgers_;
// The source bridge for other source.
std::vector<ISrsLiveSourceBridge*> bridges_;
// The edge control service
SrsPlayEdge* play_edge;
SrsPublishEdge* publish_edge;
Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_srt_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,13 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish()
return srs_error_wrap(err, "create source");
}

SrsRtmpFromSrtBridge *bridger = new SrsRtmpFromSrtBridge(live_source);
if ((err = bridger->initialize(req_)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "create bridger");
SrsRtmpFromSrtBridge *bridge = new SrsRtmpFromSrtBridge(live_source);
if ((err = bridge->initialize(req_)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
}

srt_source_->set_bridger(bridger);
srt_source_->set_bridge(bridge);
}

if ((err = srt_source_->on_publish()) != srs_success) {
Expand Down
26 changes: 13 additions & 13 deletions trunk/src/app/srs_app_srt_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,11 @@ SrsSrtSource::~SrsSrtSource()
// for all consumers are auto free.
consumers.clear();

for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
srs_freep(bridge);
}
bridgers_.clear();
bridges_.clear();
}

srs_error_t SrsSrtSource::initialize(SrsRequest* r)
Expand Down Expand Up @@ -707,18 +707,18 @@ void SrsSrtSource::update_auth(SrsRequest* r)
req->update_auth(r);
}

void SrsSrtSource::set_bridger(ISrsSrtSourceBridge *bridger)
void SrsSrtSource::set_bridge(ISrsSrtSourceBridge *bridge)
{
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsSrtSourceBridge* b = *iter;
if (b->get_type() == bridger->get_type()) {
if (b->get_type() == bridge->get_type()) {
srs_freep(b);
*iter = bridger;
*iter = bridge;
return;
}
}

bridgers_.push_back(bridger);
bridges_.push_back(bridge);
}

srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer)
Expand Down Expand Up @@ -765,10 +765,10 @@ srs_error_t SrsSrtSource::on_publish()
return srs_error_wrap(err, "source id change");
}

for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
if ((err = bridge->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger on publish");
return srs_error_wrap(err, "bridge on publish");
}
}

Expand All @@ -787,12 +787,12 @@ void SrsSrtSource::on_unpublish()

can_publish_ = true;

for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
bridge->on_unpublish();
srs_freep(bridge);
}
bridgers_.clear();
bridges_.clear();
}

srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
Expand All @@ -806,10 +806,10 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
}
}

for (vector<ISrsSrtSourceBridge*>::iterator iter = bridgers_.begin(); iter != bridgers_.end(); ++iter) {
for (vector<ISrsSrtSourceBridge*>::iterator iter = bridges_.begin(); iter != bridges_.end(); ++iter) {
ISrsSrtSourceBridge* bridge = *iter;
if ((err = bridge->on_packet(packet)) != srs_success) {
return srs_error_wrap(err, "bridger consume message");
return srs_error_wrap(err, "bridge consume message");
}
}

Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.