国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
簡潔rtmp源站服務(wù)器
公司在做編碼器,需要和rtmp服務(wù)器對(duì)接,nginx-rtmp實(shí)在是太復(fù)雜,不方便調(diào)試,所以做了一個(gè)簡單rtmp服務(wù)器。
參與這個(gè)項(xiàng)目的幾個(gè)朋友,說不僅僅想實(shí)現(xiàn)流媒體源站功能,還想寫出漂亮的代碼,成為“國人軟件代碼之典范”。
哈哈,目標(biāo)是遠(yuǎn)大的,代碼得一點(diǎn)一點(diǎn)寫。
https://github.com/winlinvip/simple-rtmp-server
一路輸入,多路(實(shí)例是12路輸出):
支持Flash推流,直接進(jìn)行轉(zhuǎn)碼支持HLS:
主要的定位是做RTMP/HLS流媒體核心業(yè)務(wù),支持RTMP集群(源站/邊緣,或Forward方式),支持多進(jìn)程和單進(jìn)程,支持vhost,不支持點(diǎn)播。
因?yàn)楹唵?,所以穩(wěn)定;不是那個(gè)級(jí)別的穩(wěn)定,是那個(gè)級(jí)別的穩(wěn)定,非常穩(wěn)定,沒有出錯(cuò)的可能性。
盡管這樣,也是基于state-threads的高性能服務(wù)器,一個(gè)進(jìn)程一個(gè)線程,異步socket,能成為最好的流服務(wù)器。
而且,最方便的是邏輯簡單,方便調(diào)試,沒有異步的回調(diào),全是同步函數(shù)。
RTMP協(xié)議收發(fā)包完整實(shí)現(xiàn),510行就全部搞定。
確實(shí)很簡單,發(fā)送RTMP包使用writev,一個(gè)函數(shù)就可以搞定:
[cpp] view plain copy
/**
* send out message with encoded payload to peer.
* use the message encode method to encode to payload,
* then sendout over socket.
* @msg this method will free it whatever return value.
*/
virtual int send_message(SrsMessage* msg);
實(shí)現(xiàn)如下:
[cpp] view plain copy
int SrsProtocol::send_message(SrsMessage* msg)
{
int ret = ERROR_SUCCESS;
if ((ret = msg->encode_packet()) != ERROR_SUCCESS) {
srs_error("encode packet to message payload failed. ret=%d", ret);
return ret;
}
srs_info("encode packet to message payload success");
// p set to current write position,
// it's ok when payload is NULL and size is 0.
char* p = (char*)msg->payload;
// always write the header event payload is empty.
do {
// generate the header.
char* pheader = NULL;
int header_size = 0;
if (p == (char*)msg->payload) {
// write new chunk stream header, fmt is 0
pheader = out_header_fmt0;
*pheader++ = 0x00 | (msg->get_perfer_cid() & 0x3F);
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
if (msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP) {
*pheader++ = 0xFF;
*pheader++ = 0xFF;
*pheader++ = 0xFF;
} else {
pp = (char*)&msg->header.timestamp;
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
// message_length, 3bytes, big-endian
pp = (char*)&msg->header.payload_length;
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
// message_type, 1bytes
*pheader++ = msg->header.message_type;
// message_length, 3bytes, little-endian
pp = (char*)&msg->header.stream_id;
*pheader++ = pp[0];
*pheader++ = pp[1];
*pheader++ = pp[2];
*pheader++ = pp[3];
// chunk extended timestamp header, 0 or 4 bytes, big-endian
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&msg->header.timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
header_size = pheader - out_header_fmt0;
pheader = out_header_fmt0;
} else {
// write no message header chunk stream, fmt is 3
pheader = out_header_fmt3;
*pheader++ = 0xC0 | (msg->get_perfer_cid() & 0x3F);
// chunk extended timestamp header, 0 or 4 bytes, big-endian
if(msg->header.timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&msg->header.timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
header_size = pheader - out_header_fmt3;
pheader = out_header_fmt3;
}
// sendout header and payload by writev.
// decrease the sys invoke count to get higher performance.
int payload_size = msg->size - (p - (char*)msg->payload);
if (payload_size > out_chunk_size) {
payload_size = out_chunk_size;
}
// send by writev
iovec iov[2];
iov[0].iov_base = pheader;
iov[0].iov_len = header_size;
iov[1].iov_base = p;
iov[1].iov_len = payload_size;
ssize_t nwrite;
if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) {
srs_error("send with writev failed. ret=%d", ret);
return ret;
}
// consume sendout bytes when not empty packet.
if (msg->payload && msg->size > 0) {
p += payload_size;
}
} while (p < (char*)msg->payload + msg->size);
return ret;
}
收RTMP包麻煩一點(diǎn),需要五個(gè)函數(shù)(只有recv_message是public的):
[cpp] view plain copy
/**
* recv a message with raw/undecoded payload from peer.
* the payload is not decoded, use srs_rtmp_expect_message<T> if requires
* specifies message.
* @pmsg, user must free it. NULL if not success.
* @remark, only when success, user can use and must free the pmsg.
*/
virtual int recv_message(SrsMessage** pmsg);
/**
* try to recv interlaced message from peer,
* return error if error occur and nerver set the pmsg,
* return success and pmsg set to NULL if no entire message got,
* return success and pmsg set to entire message if got one.
*/
virtual int recv_interlaced_message(SrsMessage** pmsg);
/**
* read the chunk basic header(fmt, cid) from chunk stream.
* user can discovery a SrsChunkStream by cid.
* @bh_size return the chunk basic header size, to remove the used bytes when finished.
*/
virtual int read_basic_header(char& fmt, int& cid, int& bh_size);
/**
* read the chunk message header(timestamp, payload_length, message_type, stream_id)
* from chunk stream and save to SrsChunkStream.
* @mh_size return the chunk message header size, to remove the used bytes when finished.
*/
virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
/**
* read the chunk payload, remove the used bytes in buffer,
* if got entire message, set the pmsg.
* @payload_size read size in this roundtrip, generally a chunk size or left message size.
*/
virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);
實(shí)現(xiàn)如下:
[cpp] view plain copy
int SrsProtocol::recv_message(SrsMessage** pmsg)
{
*pmsg = NULL;
int ret = ERROR_SUCCESS;
while (true) {
SrsMessage* msg = NULL;
if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv interlaced message failed. ret=%d", ret);
return ret;
}
srs_verbose("entire msg received");
if (!msg) {
continue;
}
if (msg->size <= 0 || msg->header.payload_length <= 0) {
srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
delete msg;
continue;
}
srs_verbose("get a msg with raw/undecoded payload");
*pmsg = msg;
break;
}
return ret;
}
int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
{
int ret = ERROR_SUCCESS;
// chunk stream basic header.
char fmt = 0;
int cid = 0;
int bh_size = 0;
if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) {
srs_error("read basic header failed. ret=%d", ret);
return ret;
}
srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size);
// get the cached chunk stream.
SrsChunkStream* chunk = NULL;
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
srs_info("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = chunk_streams[cid];
srs_info("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
}
// chunk stream message header
int mh_size = 0;
if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) {
srs_error("read message header failed. ret=%d", ret);
return ret;
}
srs_info("read message header success. "
"fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type,
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
// read msg payload from chunk stream.
SrsMessage* msg = NULL;
int payload_size = 0;
if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) {
srs_error("read message payload failed. ret=%d", ret);
return ret;
}
// not got an entire RTMP message, try next chunk.
if (!msg) {
srs_info("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
*pmsg = msg;
srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%d, sid=%d)",
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
{
int ret = ERROR_SUCCESS;
int required_size = 1;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
return ret;
}
char* p = buffer->bytes();
fmt = (*p >> 6) & 0x03;
cid = *p & 0x3f;
bh_size = 1;
if (cid > 1) {
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
return ret;
}
if (cid == 0) {
required_size = 2;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
return ret;
}
cid = 64;
cid += *(++p);
bh_size = 2;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
} else if (cid == 1) {
required_size = 3;
if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
return ret;
}
cid = 64;
cid += *(++p);
cid += *(++p) * 256;
bh_size = 3;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
} else {
srs_error("invalid path, impossible basic header.");
srs_assert(false);
}
return ret;
}
int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size)
{
int ret = ERROR_SUCCESS;
// when not exists cached msg, means get an new message,
// the fmt must be type0 which means new message.
if (!chunk->msg && fmt != RTMP_FMT_TYPE0) {
ret = ERROR_RTMP_CHUNK_START;
srs_error("chunk stream start, "
"fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
return ret;
}
// when exists cache msg, means got an partial message,
// the fmt must not be type0 which means new message.
if (chunk->msg && fmt == RTMP_FMT_TYPE0) {
ret = ERROR_RTMP_CHUNK_START;
srs_error("chunk stream exists, "
"fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
return ret;
}
// create msg when new chunk stream start
if (!chunk->msg) {
srs_assert(fmt == RTMP_FMT_TYPE0);
chunk->msg = new SrsMessage();
srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
}
// read message header from socket to buffer.
static char mh_sizes[] = {11, 7, 1, 0};
mh_size = mh_sizes[(int)fmt];
srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
int required_size = bh_size + mh_size;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
return ret;
}
char* p = buffer->bytes() + bh_size;
// parse the message header.
// see also: ngx_rtmp_recv
if (fmt <= RTMP_FMT_TYPE2) {
int32_t timestamp_delta;
char* pp = (char*)×tamp_delta;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
if (fmt == RTMP_FMT_TYPE0) {
// 6.1.2.1. Type 0
// For a type-0 chunk, the absolute timestamp of the message is sent
// here.
chunk->header.timestamp = timestamp_delta;
} else {
// 6.1.2.2. Type 1
// 6.1.2.3. Type 2
// For a type-1 or type-2 chunk, the difference between the previous
// chunk's timestamp and the current chunk's timestamp is sent here.
chunk->header.timestamp += timestamp_delta;
}
// fmt: 0
// timestamp: 3 bytes
// If the timestamp is greater than or equal to 16777215
// (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
// ‘extended timestamp header’ MUST be present. Otherwise, this value
// SHOULD be the entire timestamp.
//
// fmt: 1 or 2
// timestamp delta: 3 bytes
// If the delta is greater than or equal to 16777215 (hexadecimal
// 0x00ffffff), this value MUST be 16777215, and the ‘extended
// timestamp header’ MUST be present. Otherwise, this value SHOULD be
// the entire delta.
chunk->extended_timestamp = (timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);
if (chunk->extended_timestamp) {
chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP;
}
if (fmt <= RTMP_FMT_TYPE1) {
pp = (char*)&chunk->header.payload_length;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
chunk->header.message_type = *p++;
if (fmt == 0) {
pp = (char*)&chunk->header.stream_id;
pp[0] = *p++;
pp[1] = *p++;
pp[2] = *p++;
pp[3] = *p++;
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d, sid=%d",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
chunk->header.message_type, chunk->header.stream_id);
} else {
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d, payload=%d, type=%d",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
chunk->header.message_type);
}
} else {
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%d",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);
}
} else {
srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d",
fmt, mh_size, chunk->extended_timestamp);
}
if (chunk->extended_timestamp) {
mh_size += 4;
required_size = bh_size + mh_size;
srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
return ret;
}
char* pp = (char*)&chunk->header.timestamp;
pp[3] = *p++;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
srs_verbose("header read ext_time completed. time=%d", chunk->header.timestamp);
}
// valid message
if (chunk->header.payload_length < 0) {
ret = ERROR_RTMP_MSG_INVLIAD_SIZE;
srs_error("RTMP message size must not be negative. size=%d, ret=%d",
chunk->header.payload_length, ret);
return ret;
}
// copy header to msg
chunk->msg->header = chunk->header;
return ret;
}
int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg)
{
int ret = ERROR_SUCCESS;
// empty message
if (chunk->header.payload_length == 0) {
// need erase the header in buffer.
buffer->erase(bh_size + mh_size);
srs_trace("get an empty RTMP "
"message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type,
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
*pmsg = chunk->msg;
chunk->msg = NULL;
return ret;
}
srs_assert(chunk->header.payload_length > 0);
// the chunk payload size.
payload_size = chunk->header.payload_length - chunk->msg->size;
if (payload_size > in_chunk_size) {
payload_size = in_chunk_size;
}
srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d",
payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size);
// create msg payload if not initialized
if (!chunk->msg->payload) {
chunk->msg->payload = new int8_t[chunk->header.payload_length];
memset(chunk->msg->payload, 0, chunk->header.payload_length);
srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length);
}
// read payload to buffer
int required_size = bh_size + mh_size + payload_size;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);
return ret;
}
memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size);
buffer->erase(bh_size + mh_size + payload_size);
chunk->msg->size += payload_size;
srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
// got entire RTMP message?
if (chunk->header.payload_length == chunk->msg->size) {
*pmsg = chunk->msg;
chunk->msg = NULL;
srs_verbose("get entire RTMP message(type=%d, size=%d, time=%d, sid=%d)",
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
srs_verbose("get partial RTMP message(type=%d, size=%d, time=%d, sid=%d), partial size=%d",
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id,
chunk->msg->size);
return ret;
}
NGINX-RTMP對(duì)應(yīng)的代碼無數(shù)地方無數(shù)行,復(fù)雜得無與倫比。
http://blog.csdn.NET/win_lin/article/details/12844375
本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
詳解RTP協(xié)議之H264封包和解包實(shí)戰(zhàn)
python如何實(shí)現(xiàn)遠(yuǎn)程控制電腦(結(jié)合微信)
Websocket消息幀粘包,拆包及處理方法
koa2實(shí)現(xiàn)jwt登錄
手把手教你用Python網(wǎng)絡(luò)爬蟲+自動(dòng)化來創(chuàng)建一位屬于你自己的虛擬女票(附源碼)
python3使用smtplib通過qq郵箱發(fā)送郵件ok
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服