Java通过FFmpeg实现rtp多路推流
0
package com.acgist.stream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.json.JSONException;
import org.json.JSONObject;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.engineio.client.transports.WebSocket;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
/**
* 媒体工具
*
* @author acgist
*/
@Slf4j
@Getter
public class Streamer {
private RtpServer rtpServer;
private Process ffmpegProcess;
private boolean broadcast = false;
private DatagramSocket ffmpegSocket;
private final Timer timer = new Timer();
private final List<Socket> sockets = new ArrayList<>();
private final List<RtpServer> clients = new ArrayList<>();
/**
* 连接信令
*
* @param clientId 终端ID
* @param address 信令地址
*
* @return 信令连接
*
* @throws Exception 异常
*/
public Socket connect(String clientId, String address) throws Exception {
final HostnameVerifier hostnameVerifier = new HostnameVerifier() {
@Override
public boolean verify(String hostname, SSLSession sslSession) {
return true;
}
};
final X509TrustManager trustManager = new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[] {};
}
@Override
public void checkClientTrusted(X509Certificate[] certificates, String name) {
}
@Override
public void checkServerTrusted(X509Certificate[] certificates, String name) {
}
};
final SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, new TrustManager[] { trustManager }, null);
final OkHttpClient okHttpClient = new OkHttpClient.Builder()
.readTimeout(30, TimeUnit.SECONDS)
.hostnameVerifier(hostnameVerifier)
.sslSocketFactory(sslContext.getSocketFactory(), trustManager)
.build();
final Properties properties = System.getProperties();
properties.setProperty("jdk.internal.httpclient.disableHostnameVerification", Boolean.TRUE.toString());
final IO.Options options = new IO.Options();
options.query = "clientId=" + clientId + "&role=client";
// options.query = "clientId=web&role=client&username=media&password=media";
options.timeout = 5000;
options.transports = new String[]{ WebSocket.NAME };
options.callFactory = okHttpClient;
options.webSocketFactory = okHttpClient;
options.reconnectionDelay = 5000;
options.reconnectionAttempts = Integer.MAX_VALUE;
final Socket socket = IO.socket(address, options);
socket.on(Socket.EVENT_CONNECT, (Object ... args) -> {
log.info("信令连接");
});
socket.on(Socket.EVENT_DISCONNECT, (Object ... args) -> {
log.info("信令断开:{}", args);
});
socket.on(Socket.EVENT_CONNECT_ERROR, (Object ... args) -> {
log.info("信令异常:{}", args);
});
socket.connect();
while(!socket.connected()) {
Thread.sleep(50);
}
this.heartbeat(socket);
this.sockets.add(socket);
return socket;
}
/**
* 设备心跳
* 必须实现否则信令会超时断开
*
* @param socket
*/
public void heartbeat(Socket socket) {
this.timer.schedule(new TimerTask() {
@Override
public void run() {
socket.emit("client::heartbeat", new Object[] {
Map.of(
// 信号强度
"signal", 100,
// 电池电量
"battery", 100,
// 是否充电
"charging", true
)
});
}
},30 * 1000, 30 * 1000);
}
/**
* 发布音频
*
* @param ssrc SSRC
* @param clientId ClientID
* @param socket Socket
*/
public void publishAudio(int ssrc, String clientId, Socket socket) {
socket.emit("client::transport::plain", new Object[] {
Map.of(
"clientId", clientId,
"producing", true,
"consuming", false,
"rtpCapabilities", """
{
"codecs": [{
"kind": "audio",
"mimeType": "audio/PCMU",
"preferredPayloadType": 0,
"clockRate": 8000,
"rtcpFeedback": [{
"type": "transport-cc",
"parameter": ""
}],
"channels": 1,
"parameters": {}
}, {
"kind": "audio",
"mimeType": "audio/PCMA",
"preferredPayloadType": 8,
"clockRate": 8000,
"rtcpFeedback": [{
"type": "transport-cc",
"parameter": ""
}],
"channels": 1,
"parameters": {}
}, {
"kind": "audio",
"mimeType": "audio/opus",
"clockRate": 48000,
"channels": 2,
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"parameters": {},
"preferredPayloadType": 100
}, {
"kind": "video",
"mimeType": "video/H264",
"clockRate": 90000,
"parameters": {
"level-asymmetry-allowed": 1,
"packetization-mode": 1,
"profile-level-id": "4d0032",
"x-google-start-bitrate": 1000
},
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"preferredPayloadType": 101
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 102,
"clockRate": 90000,
"parameters": {
"apt": 101
},
"rtcpFeedback": []
}, {
"kind": "video",
"mimeType": "video/H264",
"clockRate": 90000,
"parameters": {
"level-asymmetry-allowed": 1,
"packetization-mode": 1,
"profile-level-id": "42e01f",
"x-google-start-bitrate": 1000
},
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"preferredPayloadType": 103
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 104,
"clockRate": 90000,
"parameters": {
"apt": 103
},
"rtcpFeedback": []
}, {
"kind": "video",
"mimeType": "video/VP8",
"clockRate": 90000,
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"parameters": {
"x-google-start-bitrate": 1000
},
"preferredPayloadType": 105
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 106,
"clockRate": 90000,
"parameters": {
"apt": 105
},
"rtcpFeedback": []
}, {
"kind": "video",
"mimeType": "video/VP9",
"clockRate": 90000,
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"parameters": {
"profile-id": 2,
"x-google-start-bitrate": 1000
},
"preferredPayloadType": 107
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 108,
"clockRate": 90000,
"parameters": {
"apt": 107
},
"rtcpFeedback": []
}],
"headerExtensions": [{
"kind": "audio",
"uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
"preferredId": 1,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
"preferredId": 1,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"preferredId": 2,
"preferredEncrypt": false,
"direction": "recvonly"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
"preferredId": 3,
"preferredEncrypt": false,
"direction": "recvonly"
}, {
"kind": "audio",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
"preferredId": 4,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
"preferredId": 4,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "audio",
"uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
"preferredId": 5,
"preferredEncrypt": false,
"direction": "recvonly"
}, {
"kind": "video",
"uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
"preferredId": 5,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "http://tools.ietf.org/html/draft-ietf-avtext-framemarking-07",
"preferredId": 6,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:framemarking",
"preferredId": 7,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "audio",
"uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
"preferredId": 10,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:3gpp:video-orientation",
"preferredId": 11,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:toffset",
"preferredId": 12,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "audio",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
"preferredId": 13,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
"preferredId": 13,
"preferredEncrypt": false,
"direction": "sendrecv"
}]
}
"""
)
}, (Object ... plainArgs) -> {
final JSONObject transport = (JSONObject) plainArgs[0];
String ip;
String port;
String rtcpPort;
String transportId;
try {
ip = transport.getString("ip");
port = transport.getString("port");
rtcpPort = transport.getString("rtcpPort");
transportId = transport.getString("transportId");
} catch (JSONException e) {
log.error("解析响应异常:{}", transport, e);
return;
}
socket.emit("client::media::produce", new Object[] {
Map.of(
"kind", "audio",
"clientId", clientId,
"transportId", transportId,
"appData", Map.of(),
"rtpParameters", Map.of(
"codecs", List.of(Map.of(
"mimeType", "audio/opus",
"channels", 2,
"clockRate", 48000,
"payloadType", 100
/*
"parameters", Map.of(
"usedtx", 1,
"minptime", 10,
"useinbandfec", 1,
"sprop-stereo", 1
)
*/
)),
"encodings", List.of(Map.of(
/*
"dtx", false,
*/
"ssrc", ssrc
))
)
)
}, (Object ... produceArgs) -> {
final JSONObject json = (JSONObject) produceArgs[0];
log.info("生产音频媒体:{} - {}", ip, json.toString());
this.rtpServer = new RtpServer(ip);
this.rtpServer.putAudio(ssrc, port, rtcpPort);
});
});
}
/**
* 发布视频
*
* @param ssrc SSRC
* @param clientId ClientID
* @param socket Socket
*/
public void publishVideo(int ssrc, String clientId, Socket socket) {
socket.emit("client::transport::plain", new Object[] {
Map.of(
"clientId", clientId,
"producing", true,
"consuming", false,
"rtpCapabilities", """
{
"codecs": [{
"kind": "audio",
"mimeType": "audio/PCMU",
"preferredPayloadType": 0,
"clockRate": 8000,
"rtcpFeedback": [{
"type": "transport-cc",
"parameter": ""
}],
"channels": 1,
"parameters": {}
}, {
"kind": "audio",
"mimeType": "audio/PCMA",
"preferredPayloadType": 8,
"clockRate": 8000,
"rtcpFeedback": [{
"type": "transport-cc",
"parameter": ""
}],
"channels": 1,
"parameters": {}
}, {
"kind": "audio",
"mimeType": "audio/opus",
"clockRate": 48000,
"channels": 2,
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"parameters": {},
"preferredPayloadType": 100
}, {
"kind": "video",
"mimeType": "video/H264",
"clockRate": 90000,
"parameters": {
"level-asymmetry-allowed": 1,
"packetization-mode": 1,
"profile-level-id": "4d0032",
"x-google-start-bitrate": 1000
},
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"preferredPayloadType": 101
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 102,
"clockRate": 90000,
"parameters": {
"apt": 101
},
"rtcpFeedback": []
}, {
"kind": "video",
"mimeType": "video/H264",
"clockRate": 90000,
"parameters": {
"level-asymmetry-allowed": 1,
"packetization-mode": 1,
"profile-level-id": "42e01f",
"x-google-start-bitrate": 1000
},
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"preferredPayloadType": 103
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 104,
"clockRate": 90000,
"parameters": {
"apt": 103
},
"rtcpFeedback": []
}, {
"kind": "video",
"mimeType": "video/VP8",
"clockRate": 90000,
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"parameters": {
"x-google-start-bitrate": 1000
},
"preferredPayloadType": 105
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 106,
"clockRate": 90000,
"parameters": {
"apt": 105
},
"rtcpFeedback": []
}, {
"kind": "video",
"mimeType": "video/VP9",
"clockRate": 90000,
"rtcpFeedback": [{
"type": "nack",
"parameter": ""
}, {
"type": "nack",
"parameter": "pli"
}, {
"type": "ccm",
"parameter": "fir"
}, {
"type": "goog-remb",
"parameter": ""
}, {
"type": "transport-cc",
"parameter": ""
}],
"parameters": {
"profile-id": 2,
"x-google-start-bitrate": 1000
},
"preferredPayloadType": 107
}, {
"kind": "video",
"mimeType": "video/rtx",
"preferredPayloadType": 108,
"clockRate": 90000,
"parameters": {
"apt": 107
},
"rtcpFeedback": []
}],
"headerExtensions": [{
"kind": "audio",
"uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
"preferredId": 1,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
"preferredId": 1,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"preferredId": 2,
"preferredEncrypt": false,
"direction": "recvonly"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
"preferredId": 3,
"preferredEncrypt": false,
"direction": "recvonly"
}, {
"kind": "audio",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
"preferredId": 4,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
"preferredId": 4,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "audio",
"uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
"preferredId": 5,
"preferredEncrypt": false,
"direction": "recvonly"
}, {
"kind": "video",
"uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
"preferredId": 5,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "http://tools.ietf.org/html/draft-ietf-avtext-framemarking-07",
"preferredId": 6,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:framemarking",
"preferredId": 7,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "audio",
"uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
"preferredId": 10,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:3gpp:video-orientation",
"preferredId": 11,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "urn:ietf:params:rtp-hdrext:toffset",
"preferredId": 12,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "audio",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
"preferredId": 13,
"preferredEncrypt": false,
"direction": "sendrecv"
}, {
"kind": "video",
"uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
"preferredId": 13,
"preferredEncrypt": false,
"direction": "sendrecv"
}]
}
"""
)
}, (Object ... plainArgs) -> {
final JSONObject transport = (JSONObject) plainArgs[0];
String ip;
String port;
String rtcpPort;
String transportId;
try {
ip = transport.getString("ip");
port = transport.getString("port");
rtcpPort = transport.getString("rtcpPort");
transportId = transport.getString("transportId");
} catch (JSONException e) {
log.error("解析响应异常:{}", transport, e);
return;
}
socket.emit("client::media::produce", new Object[] {
Map.of(
"kind", "video",
"clientId", clientId,
"transportId", transportId,
"appData", Map.of(),
"rtpParameters", Map.of(
"codecs", List.of(Map.of(
"mimeType", "video/h264",
"clockRate", 90000,
"payloadType", 101,
"rtcpFeedback", List.of(),
"parameters", Map.of(
"packetization-mode", 1,
"profile-level-id", "4d0032",
"level-asymmetry-allowed", 1
)
)),
"encodings", List.of(Map.of(
"ssrc", ssrc
))
)
)
}, (Object ... produceArgs) -> {
final JSONObject json = (JSONObject) produceArgs[0];
log.info("生产视频媒体:{} - {}", ip, json.toString());
this.rtpServer.putVideo(ssrc, port, rtcpPort);
this.clients.add(this.rtpServer);
this.rtpServer = null;
});
});
}
/**
* 执行命令
*
* @param command 命令
*/
public void executeCommand(String command) {
if(this.ffmpegProcess != null) {
log.info("已经开始执行命令");
return;
}
try {
ProcessBuilder processBuilder = null;
final boolean linux = File.separatorChar == '/';
if(linux) {
processBuilder = new ProcessBuilder("/bin/bash", "-c", command);
this.ffmpegProcess = processBuilder.start();
} else {
processBuilder = new ProcessBuilder("cmd", "/c", command);
this.ffmpegProcess = processBuilder.start();
}
final InputStream inputStream = this.ffmpegProcess.getInputStream();
final InputStream errorStream = this.ffmpegProcess.getErrorStream();
final StringBuffer input = new StringBuffer();
final StringBuffer error = new StringBuffer();
final Thread inputThread = new Thread(() -> {
try {
input.append(linux ? new String(inputStream.readAllBytes()) : new String(inputStream.readAllBytes(), "GBK"));
} catch (Exception e) {
log.error("结果读取异常", e);
}
});
inputThread.setDaemon(true);
inputThread.start();
final Thread errorThread = new Thread(() -> {
try {
error.append(linux ? new String(errorStream.readAllBytes()) : new String(errorStream.readAllBytes(), "GBK"));
} catch (Exception e) {
log.error("结果读取异常", e);
}
});
errorThread.setDaemon(true);
errorThread.start();
this.ffmpegProcess.waitFor();
inputStream.close();
errorStream.close();
log.info("命令执行完成:{}", input);
log.info("命令执行完成:{}", error);
} catch (Exception e) {
log.error("执行命令异常:{}", command, e);
}
}
/**
* 广播媒体
*
* @param video 视频地址
* @param fps 帧数
* @param crf 质量
*/
public void broadcast(String video, int fps, int crf) {
if(this.broadcast) {
log.info("已经开始广播");
return;
}
this.broadcast = true;
final Thread broadcastThread = new Thread(() -> {
try {
final int port = 9898;
final String host = InetAddress.getLocalHost().getHostAddress();
this.ffmpegSocket = new DatagramSocket(port);
// 提送本机
final String command = String.format("""
ffmpeg -re -stream_loop -1 -i %s -map 0:v:0 -c:v h264 -r %d -crf %d -map 0:a:0 -c:a libopus -ab 96k -ac 2 -ar 48000 -f tee "[select=v:f=rtp:ssrc=%d:payload_type=%d]rtp://%s:%d?rtcpport=%d|[select=a:f=rtp:ssrc=%d:payload_type=%d]rtp://%s:%d?rtcpport=%d"
""",
video, fps, crf,
10000, 101, host, port, port,
10001, 100, host, port, port
);
log.info("执行命令:{}", command);
final Thread commandThread = new Thread(() -> {
this.executeCommand(command);
});
commandThread.setDaemon(true);
commandThread.start();
final DatagramPacket packet = new DatagramPacket(new byte[8096], 8096);
// 开始转发
while(this.broadcast && !this.ffmpegSocket.isClosed()) {
this.ffmpegSocket.receive(packet);
final byte[] data = packet.getData();
final ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.put(data, 8, 4);
buffer.flip();
final int ssrc = buffer.getInt();
this.clients.forEach(server -> {
try {
System.arraycopy(ssrc == 10000 ? server.videoSsrcBytes : server.audioSsrcBytes, 0, data, 8, 4);
final DatagramPacket datagramPacket = new DatagramPacket(
data,
packet.getOffset(),
packet.getLength(),
new InetSocketAddress(server.ip, ssrc == 10000 ? server.videoPort : server.audioPort)
);
this.ffmpegSocket.send(datagramPacket);
} catch (Exception e) {
log.error("转发异常", e);
}
});
}
} catch (Exception e) {
log.error("转发异常", e);
}
});
broadcastThread.setDaemon(true);
broadcastThread.start();
}
/**
* 关闭资源
*/
public void close() {
this.broadcast = false;
if(this.ffmpegProcess != null) {
final OutputStream outputStream = this.ffmpegProcess.getOutputStream();
try {
outputStream.write('q');
} catch (IOException e) {
log.error("推出线程异常", e);
}
this.ffmpegProcess.children().forEach(ProcessHandle::destroyForcibly);
this.ffmpegProcess.descendants().forEach(ProcessHandle::destroyForcibly);
this.ffmpegProcess.destroyForcibly();
this.ffmpegProcess = null;
}
this.sockets.forEach(Socket::disconnect);
this.sockets.clear();
this.clients.clear();
if(this.ffmpegSocket != null) {
this.ffmpegSocket.close();
this.ffmpegSocket = null;
}
}
static class RtpServer {
public String ip;
public int audioSsrc;
public int audioPort;
public int audioRtcpPort;
public byte[] audioSsrcBytes;
public int videoSsrc;
public int videoPort;
public int videoRtcpPort;
public byte[] videoSsrcBytes;
public RtpServer(String ip) {
this.ip = ip;
}
public void putAudio(int ssrc, String port, String rtcpPort) {
this.audioSsrc = ssrc;
this.audioPort = Integer.parseInt(port);
this.audioRtcpPort = Integer.parseInt(rtcpPort);
final ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(ssrc);
this.audioSsrcBytes = buffer.array();
}
public void putVideo(int ssrc, String port, String rtcpPort) {
this.videoSsrc = ssrc;
this.videoPort = Integer.parseInt(port);
this.videoRtcpPort = Integer.parseInt(rtcpPort);
final ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(ssrc);
this.videoSsrcBytes = buffer.array();
}
}
}