srs
接收客户端 ,启动 conn = new SrsresponseOnlyHttpConn(this, client_stfd, http_server);
[2019-05-21 00:21:31.469][trace][accept_client][115259][103] ##=SrsServer::accept_client(###
[2019-05-21 00:21:31.469][trace][SrsResponseOnlyHttpConn][115259][103] ##SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn ##
[2019-05-21 00:21:31.469][trace][accept_client][115259][103] #####Create SRS_AUTO_HTTP_SERVER ######
int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
{
int ret = ERROR_SUCCESS;
int fd = st_netfd_fileno(client_stfd);
srs_trace("##=SrsServer::accept_client(###");
int max_connections = _srs_config->get_max_connections();
if ((int)conns.size() >= max_connections) {
srs_error("exceed the max connections, drop client: "
"clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd);
srs_close_stfd(client_stfd);
return ret;
}
// avoid fd leak when fork.
// @see https://github.com/ossrs/srs/issues/518
if (true) {
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
ret = ERROR_SYSTEM_PID_GET_FILE_INFO;
srs_error("fnctl F_GETFD error! fd=%d. ret=%#x", fd, ret);
srs_close_stfd(client_stfd);
return ret;
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
ret = ERROR_SYSTEM_PID_SET_FILE_INFO;
srs_error("fcntl F_SETFD error! fd=%d ret=%#x", fd, ret);
srs_close_stfd(client_stfd);
return ret;
}
}
SrsConnection* conn = NULL;
if (type == SrsListenerRtmpStream) {
conn = new SrsRtmpConn(this, client_stfd);
} else if (type == SrsListenerHttpApi) {
#ifdef SRS_AUTO_HTTP_API
conn = new SrsHttpApi(this, client_stfd, http_api_mux);
srs_trace("##### SRS_AUTO_HTTP_API ######");
#else
srs_warn("close http client for server not support http-api");
srs_close_stfd(client_stfd);
return ret;
#endif
} else if (type == SrsListenerHttpStream) {
#ifdef SRS_AUTO_HTTP_SERVER
conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server);
srs_trace("#####Create SRS_AUTO_HTTP_SERVER ######");
#else
srs_warn("close http client for server not support http-server");
srs_close_stfd(client_stfd);
return ret;
#endif
} else if (type == SrsListenerWebsocketStream) {
#ifdef SRS_AUTO_WEBSOCKET_SERVER
conn = new WebConn(this, client_stfd, http_server);
srs_trace("#####SRS_AUTO_WEBSOCKET_SERVER ######");
#else
srs_warn("close http client for server not support http-server");
srs_close_stfd(client_stfd);
return ret;
#endif
}
else {
// TODO: FIXME: handler others
}
srs_assert(conn);
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn);
srs_verbose("add conn to vector.");
// cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed.
if ((ret = conn->start()) != ERROR_SUCCESS) {
return ret;
}
srs_verbose("conn started success.");
srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
return ret;
}
int SrsConnection::start()
{
return pthread->start();
}
[2019-05-21 00:21:31.469][trace][start][115259][103] #####SrsOneCycleThread::start()#####
int SrsOneCycleThread::start()
{
srs_trace("#####SrsOneCycleThread::start()#####");
return pthread->start();
}
int SrsThread::start()
{
srs_trace("## SrsThread::start() ##");
int ret = ERROR_SUCCESS;
if(tid) {
srs_info("thread %s already running.", _name);
return ret;
}
if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
disposed = false;
// we set to loop to true for thread to run.
loop = true;
// wait for cid to ready, for parent thread to get the cid.
while (_cid < 0) {
st_usleep(10 * 1000);
}
// now, cycle thread can run.
can_run = true;
return ret;
}
## SrsThread::start() ##
void* SrsThread::thread_fun(void* arg)
{
srs_trace("## SrsThread::thread_fun ##");
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj);
obj->thread_cycle();
// for valgrind to detect.
SrsThreadcontext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
if (ctx) {
ctx->clear_cid();
}
st_thread_exit(NULL);
return NULL;
}
void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS;
srs_trace("##SrsThread::thread_cycle() ##");
_srs_context->generate_id();
srs_info("thread %s cycle start", _name);
_cid = _srs_context->get_id();
srs_assert(handler);
handler->on_thread_start();
// thread is running now.
really_terminated = false;
// wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) {
st_usleep(10 * 1000);
}
while (loop) {
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on before cycle success", _name);
if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
}
goto failed;
}
srs_info("thread %s cycle success", _name);
if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", _name);
failed:
if (!loop) {
break;
}
// to improve performance, donot sleep when Interval is zero.
// @see: https://github.com/ossrs/srs/issues/237
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
}
}
// readly terminated now.
really_terminated = true;
handler->on_thread_stop();
srs_info("thread %s cycle finished", _name);
}
int SrsOneCycleThread::cycle()
{
srs_trace(" ##int SrsOneCycleThread::cycle() ##");
int ret = handler->cycle();
pthread->stop_loop();
return ret;
}
nt SrsConnection::cycle()
{
int ret = ERROR_SUCCESS;
srs_trace("## SrsConnection::cycle() ##");
_srs_context->generate_id();
id = _srs_context->get_id();
ip = srs_get_peer_ip(st_netfd_fileno(stfd));
ret = do_cycle();
// if socket io error, set to closed.
if (srs_is_client_gracefully_close(ret)) {
ret = ERROR_SOCKET_CLOSED;
}
// success.
if (ret == ERROR_SUCCESS) {
srs_trace("client finished.");
}
// client close peer.
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("###client disconnect peer. ret=%d", ret);
}
return ERROR_SUCCESS;
}
int SrsHttpConn::do_cycle()
{
int ret = ERROR_SUCCESS;
srs_trace("HTTP client ip=%s", ip.c_str());
srs_trace("******* int SrsHttpConn::do_cycle()**********");
srs_trace("******* @@@@@@@@@@@@@@@@@*********");
// initialize parser初始化解析器
if ((ret = parser->initialize(HTTP_request)) != ERROR_SUCCESS) {
srs_error("http initiali2ze http parser failed. ret=%d", ret);
return ret;
}
// underlayer socke创建底层socket
SrsStSocket skt(stfd);
// set the recv timeout, for some clients never disconnect the connection.设置超时时间
// @see https://github.com/ossrs/srs/issues/398
skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
// process http messages. 处理http消息
while (!disposed) {
ISrsHttpMessage* req = NULL;
// get a http message
if ((ret = parser->parse_message(&skt, this, &req)) != ERROR_SUCCESS) {
return ret;
}
// if SUCCESS, always NOT-NULL.
srs_assert(req);
// always free it in this scope. 销毁
SrsAutoFree(ISrsHttpMessage, req);
// may should discard the body.
if ((ret = on_got_http_message(req)) != ERROR_SUCCESS) {
return ret;
}
// ok, handle http request. 处理http请求
SrsHttpResponseWriter writer(&skt);
if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) {
return ret;
}
// donot keep alive, disconnect it.
// @see https://github.com/ossrs/srs/issues/399
if (!req->is_keep_alive()) {
break;
}
}
srs_trace("******* @@@@end SrsHttpConn::do_cycle() @@@@@@@@@@@@@*********");
return ret;
}
srs_trace("############ ## #int SrsFFMPEG::start()###################");
int SrsFFMPEG::start()
{
int ret = ERROR_SUCCESS;
srs_trace("############ ## #int SrsFFMPEG::start()###################");
if (started) {
return ret;
}
// prepare exec params
char tmp[256];
std::vector<std::string> params;
// argv[0], set to ffmpeg bin.
// The execv() and execvp() functions ....
// The first argument, by convention, should point to
// the filename associated with the file being executed.
params.push_back(ffmpeg);
// input params
if (!_iparams.empty()) {
std::vector<std::string> iparm = split(_iparams, " ");
for(std::vector<std::string>::iterator it = iparm.begin();
it != iparm.end();it++)
{
//params.push_back(_iparams);
params.push_back(*it);
}
}
// input.
if (iformat != "off" && !iformat.empty()) {
params.push_back("-f");
params.push_back(iformat);
}
params.push_back("-i");
params.push_back(input);
// build the filter
if (!vfilter.empty()) {
std::vector<std::string>::iterator it;
for (it = vfilter.begin(); it != vfilter.end(); ++it) {
std::string p = *it;
if (!p.empty()) {
params.push_back(p);
}
}
}
// video specified.
if (vcodec != SRS_RTMP_ENCODER_NO_VIDEO) {
params.push_back("-vcodec");
params.push_back(vcodec);
} else {
params.push_back("-vn");
}
// the codec params is disabled when copy
if (vcodec != SRS_RTMP_ENCODER_COPY && vcodec != SRS_RTMP_ENCODER_NO_VIDEO) {
if (vbitrate > 0) {
params.push_back("-b:v");
snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000);
params.push_back(tmp);
}
if (vfps > 0) {
params.push_back("-r");
snprintf(tmp, sizeof(tmp), "%.2f", vfps);
params.push_back(tmp);
}
if (vwidth > 0 && vheight > 0) {
params.push_back("-s");
snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight);
params.push_back(tmp);
}
// TODO: add aspect if needed.
if (vwidth > 0 && vheight > 0) {
params.push_back("-aspect");
snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight);
params.push_back(tmp);
}
if (vthreads > 0) {
params.push_back("-threads");
snprintf(tmp, sizeof(tmp), "%d", vthreads);
params.push_back(tmp);
}
params.push_back("-profile:v");
params.push_back(vprofile);
params.push_back("-preset");
params.push_back(vpreset);
// vparams
if (!vparams.empty()) {
std::vector<std::string>::iterator it;
for (it = vparams.begin(); it != vparams.end(); ++it) {
std::string p = *it;
if (!p.empty()) {
params.push_back(p);
}
}
}
}
// audio specified.
if (acodec != SRS_RTMP_ENCODER_NO_AUDIO) {
params.push_back("-acodec");
params.push_back(acodec);
} else {
params.push_back("-an");
}
// the codec params is disabled when copy
if (acodec != SRS_RTMP_ENCODER_NO_AUDIO) {
if (acodec != SRS_RTMP_ENCODER_COPY) {
if (abitrate > 0) {
params.push_back("-b:a");
snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000);
params.push_back(tmp);
}
if (asample_rate > 0) {
params.push_back("-ar");
snprintf(tmp, sizeof(tmp), "%d", asample_rate);
params.push_back(tmp);
}
if (achannels > 0) {
params.push_back("-ac");
snprintf(tmp, sizeof(tmp), "%d", achannels);
params.push_back(tmp);
}
// aparams
std::vector<std::string>::iterator it;
for (it = aparams.begin(); it != aparams.end(); ++it) {
std::string p = *it;
if (!p.empty()) {
params.push_back(p);
}
}
} else {
// for audio copy.
for (int i = 0; i < (int)aparams.size();) {
std::string pn = aparams[i++];
// aparams, the adts to asc filter "-bsf:a aac_adtstoasc"
if (pn == "-bsf:a" && i < (int)aparams.size()) {
std::string pv = aparams[i++];
if (pv == "aac_adtstoasc") {
params.push_back(pn);
params.push_back(pv);
}
}
}
}
}
// output
if (oformat != "off" && !oformat.empty()) {
params.push_back("-f");
params.push_back(oformat);
}
params.push_back("-y");
params.push_back(_output);
std::string cli;
if (true) {
for (int i = 0; i < (int)params.size(); i++) {
std::string ffp = params[i];
cli += ffp;
if (i < (int)params.size() - 1) {
cli += " ";
}
}
srs_trace("start ffmpeg, log: %s, params: %s", log_file.c_str(), cli.c_str());
}
// for log
int cid = _srs_context->get_id();
// TODO: fork or vfork?
if ((pid = fork()) < 0) {
ret = ERROR_ENCODER_FORK;
srs_error("vfork process failed. ret=%d", ret);
return ret;
}
// child process: ffmpeg encoder engine.
if (pid == 0) {
// ignore the SIGINT and SIGTERM
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
// redirect logs to file.
int log_fd = -1;
int flags = O_CREAT|O_WRONLY|O_APPEND;
mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) {
ret = ERROR_ENCODER_OPEN;
srs_error("open encoder file %s failed. ret=%d", log_file.c_str(), ret);
exit(ret);
}
// log basic info
if (true) {
char buf[4096];
int pos = 0;
pos += snprintf(buf + pos, sizeof(buf) - pos, "\n");
pos += snprintf(buf + pos, sizeof(buf) - pos, "ffmpeg cid=%d\n", cid);
pos += snprintf(buf + pos, sizeof(buf) - pos, "log=%s\n", log_file.c_str());
pos += snprintf(buf + pos, sizeof(buf) - pos, "params: %s\n", cli.c_str());
::write(log_fd, buf, pos);
}
// dup to stdout and stderr.
if (dup2(log_fd, STDOUT_FILENO) < 0) {
ret = ERROR_ENCODER_DUP2;
srs_error("dup2 encoder file failed. ret=%d", ret);
exit(ret);
}
if (dup2(log_fd, STDERR_FILENO) < 0) {
ret = ERROR_ENCODER_DUP2;
srs_error("dup2 encoder file failed. ret=%d", ret);
exit(ret);
}
// close log fd
::close(log_fd);
// close other fds
// TODO: do in right way.
for (int i = 3; i < 1024; i++) {
::close(i);
}
// memory leak in child process, it's ok.
char** charpv_params = new char*[params.size() + 1];
for (int i = 0; i < (int)params.size(); i++) {
std::string& p = params[i];
charpv_params[i] = (char*)p.data();
}
// EOF: NULL
charpv_params[params.size()] = NULL;
// TODO: execv or execvp
ret = execv(ffmpeg.c_str(), charpv_params);
if (ret < 0) {
fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
errno, strerror(errno));
}
exit(ret);
}
// parent.
if (pid > 0) {
started = true;
srs_trace("fork encoder %s, pid=%d", engine_name.c_str(), pid);
return ret;
}
return ret;
}
int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_trace("##### SrsLiveStream::serve_http ######");
int ret = ERROR_SUCCESS;
ISrsStreamencoder* enc = NULL;
srs_assert(entry);
if (srs_string_ends_with(entry->pattern, ".flv")) {
w->header()->set_content_type("video/x-flv");
w->header()->set("Access-Control-Allow-Origin", "*");
w->header()->set("Access-Control-Allow-headers", "x-requested-with,content-type,range");
#ifdef SRS_PERF_FAST_FLV_ENCODER
enc = new SrsFastFlvStreamEncoder();
#else
enc = new SrsFlvStreamEncoder();
#endif
} else if (srs_string_ends_with(entry->pattern, ".aac")) {
w->header()->set_content_type("audio/x-aac");
enc = new SrsAacStreamEncoder();
} else if (srs_string_ends_with(entry->pattern, ".mp3")) {
w->header()->set_content_type("audio/mpeg");
enc = new SrsMp3StreamEncoder();
} else if (srs_string_ends_with(entry->pattern, ".ts")) {
w->header()->set_content_type("video/MP2T");
enc = new SrsTsStreamEncoder();
} else {
ret = ERROR_HTTP_LIVE_STREAM_EXT;
srs_error("http: unsupported pattern %s", entry->pattern.c_str());
return ret;
}
SrsAutoFree(ISrsStreamEncoder, enc);
// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) {
srs_error("http: create consumer failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsConsumer, consumer);
srs_verbose("http: consumer created success.");
SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
// the memory writer.
Srsstreamwriter writer(w);
if ((ret = enc->initialize(&writer, cache)) != ERROR_SUCCESS) {
srs_error("http: initialize stream encoder failed. ret=%d", ret);
return ret;
}
// if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) {
if ((ret = enc->dump_cache(consumer, source->jitter())) != ERROR_SUCCESS) {
srs_error("http: dump cache to consumer failed. ret=%d", ret);
return ret;
}
}
#ifdef SRS_PERF_FAST_FLV_ENCODER
SrsFastFlvStreamEncoder* ffe = dynamic_cast<SrsFastFlvStreamEncoder*>(enc);
#endif
// Use receive thread to accept the close event to avoid FD leak.
// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection());
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc);
SrsAutoFree(SrsHttpRecvThread, trd);
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("http: start notify thread failed, ret=%d", ret);
return ret;
}
int time_count=0;
// TODO: free and erase the disabled entry after all related connections is closed.
while (entry->enabled) {
pprint->elapse();
// Whether client closed the FD.
if ((ret = trd->error_code()) != ERROR_SUCCESS) {
return ret;
}
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
srs_trace("## int SrsOneCycleThread::cycle() ##");
if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
srs_error("http: get messages from consumer failed. ret=%d", ret);
return ret;
}
if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_constS_RTMP_PULSE_TIMEOUT_US);
// directly use sleep, donot use consumer wait.
st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
// ignore when nothing got.
time_count++;
if(time_count>=60)
{
return 1004;
}
continue;
}
if (pprint->can_print()) {
srs_trace("###-> "SRS_CONSTS_LOG_HTTP_STREAM" http: got %d msgs, age=%d, min=%d, mw=%d",
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
}
// sendout all messages.
#ifdef SRS_PERF_FAST_FLV_ENCODER
if (ffe) {
time_count=0;
ret = ffe->write_tags(msgs.msgs, count);
} else {
time_count=0;
ret = streaming_send_messages(enc, msgs.msgs, count);
}
#else
ret = streaming_send_messages(enc, msgs.msgs, count);
#endif
// free the messages.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_freep(msg);
}
// check send error code.
if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("http: send messages to client failed. ret=%d", ret);
}
return ret;
}
}
return ret;
}
相关阅读
SRS 代码分析【mpeg-ts解析】1.SrsTsContext的decode接口定义如下:int SrsTsContext::decode(SrsBuffer* stream, ISrsTsHandler*
今天一网友发信息给小编说,他打开计算机后在资源管理器里面看到一个进程:“Csrss.exe进程”,网友说其他进程都有描述信息,但是此进程
SONY拥有各种各样的“黑科技”,并且源源不断地将这些科技体现在产品上,索尼SRS-WS1这套颈挂式扬声器就是款很Amazing的个
SRS premium sound 音效增强工具到底有没有用
SRS premium sound是戴尔笔记本上随机附赠的一款模拟立体环绕声的音效软件。其目的是为用户提供良好的视听感受。在音响上模拟环
SRS Audio Sandbox SRS音效增强软件怎么使用?
SRS音效增强软件是利用操作系统的底层技术,以驱动程序的形式装入系统。不论采用何种声卡,任何媒体播放程序,这款软件都能够为你的PC