Java通过FFmpeg实现rtp多路推流

0

package com.acgist.stream;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import org.json.JSONException;
import org.json.JSONObject;

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.engineio.client.transports.WebSocket;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;

/**
 * 媒体工具
 * 
 * @author acgist
 */
@Slf4j
@Getter
public class Streamer {
    
    private RtpServer rtpServer;
    private Process ffmpegProcess;
    private boolean broadcast = false;
    private DatagramSocket ffmpegSocket;
    private final Timer timer = new Timer();
    private final List<Socket> sockets = new ArrayList<>();
    private final List<RtpServer> clients = new ArrayList<>();

    /**
     * 连接信令
     * 
     * @param clientId 终端ID
     * @param address  信令地址
     * 
     * @return 信令连接
     * 
     * @throws Exception 异常
     */
    public Socket connect(String clientId, String address) throws Exception {
        final HostnameVerifier hostnameVerifier = new HostnameVerifier() {
            @Override
            public boolean verify(String hostname, SSLSession sslSession) {
                return true;
            }
        };
        final X509TrustManager trustManager = new X509TrustManager() {
            public X509Certificate[] getAcceptedIssuers() {
                return new X509Certificate[] {};
            }
            @Override
            public void checkClientTrusted(X509Certificate[] certificates, String name) {
            }
            @Override
            public void checkServerTrusted(X509Certificate[] certificates, String name) {
            }
        };
        final SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(null, new TrustManager[] { trustManager }, null);
        final OkHttpClient okHttpClient = new OkHttpClient.Builder()
            .readTimeout(30, TimeUnit.SECONDS)
            .hostnameVerifier(hostnameVerifier)
            .sslSocketFactory(sslContext.getSocketFactory(), trustManager)
            .build();
        final Properties properties = System.getProperties();
        properties.setProperty("jdk.internal.httpclient.disableHostnameVerification", Boolean.TRUE.toString());
        final IO.Options options     = new IO.Options();
        options.query                = "clientId=" + clientId + "&role=client";
//      options.query                = "clientId=web&role=client&username=media&password=media";
        options.timeout              = 5000;
        options.transports           = new String[]{ WebSocket.NAME };
        options.callFactory          = okHttpClient;
        options.webSocketFactory     = okHttpClient;
        options.reconnectionDelay    = 5000;
        options.reconnectionAttempts = Integer.MAX_VALUE;
        final Socket socket = IO.socket(address, options);
        socket.on(Socket.EVENT_CONNECT, (Object ... args) -> {
            log.info("信令连接");
        });
        socket.on(Socket.EVENT_DISCONNECT, (Object ... args) -> {
            log.info("信令断开:{}", args);
        });
        socket.on(Socket.EVENT_CONNECT_ERROR, (Object ... args) -> {
            log.info("信令异常:{}", args);
        });
        socket.connect();
        while(!socket.connected()) {
            Thread.sleep(50);
        }
        this.heartbeat(socket);
        this.sockets.add(socket);
        return socket;
    }
    
    /**
     * 设备心跳
     * 必须实现否则信令会超时断开
     * 
     * @param socket
     */
    public void heartbeat(Socket socket) {
        this.timer.schedule(new TimerTask() {
            @Override
            public void run() {
                socket.emit("client::heartbeat", new Object[] {
                    Map.of(
                        // 信号强度
                        "signal",   100,
                        // 电池电量
                        "battery",  100,
                        // 是否充电
                        "charging", true
                    )
                });
            }
        },30 * 1000, 30 * 1000);
    }
    
    /**
     * 发布音频
     * 
     * @param ssrc     SSRC
     * @param clientId ClientID
     * @param socket   Socket
     */
    public void publishAudio(int ssrc, String clientId, Socket socket) {
        socket.emit("client::transport::plain", new Object[] {
            Map.of(
                "clientId",        clientId,
                "producing",       true,
                "consuming",       false,
                "rtpCapabilities", """
                    {
                        "codecs": [{
                            "kind": "audio",
                            "mimeType": "audio/PCMU",
                            "preferredPayloadType": 0,
                            "clockRate": 8000,
                            "rtcpFeedback": [{
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "channels": 1,
                            "parameters": {}
                        }, {
                            "kind": "audio",
                            "mimeType": "audio/PCMA",
                            "preferredPayloadType": 8,
                            "clockRate": 8000,
                            "rtcpFeedback": [{
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "channels": 1,
                            "parameters": {}
                        }, {
                            "kind": "audio",
                            "mimeType": "audio/opus",
                            "clockRate": 48000,
                            "channels": 2,
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "parameters": {},
                            "preferredPayloadType": 100
                        }, {
                            "kind": "video",
                            "mimeType": "video/H264",
                            "clockRate": 90000,
                            "parameters": {
                                "level-asymmetry-allowed": 1,
                                "packetization-mode": 1,
                                "profile-level-id": "4d0032",
                                "x-google-start-bitrate": 1000
                            },
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "preferredPayloadType": 101
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 102,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 101
                            },
                            "rtcpFeedback": []
                        }, {
                            "kind": "video",
                            "mimeType": "video/H264",
                            "clockRate": 90000,
                            "parameters": {
                                "level-asymmetry-allowed": 1,
                                "packetization-mode": 1,
                                "profile-level-id": "42e01f",
                                "x-google-start-bitrate": 1000
                            },
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "preferredPayloadType": 103
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 104,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 103
                            },
                            "rtcpFeedback": []
                        }, {
                            "kind": "video",
                            "mimeType": "video/VP8",
                            "clockRate": 90000,
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "parameters": {
                                "x-google-start-bitrate": 1000
                            },
                            "preferredPayloadType": 105
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 106,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 105
                            },
                            "rtcpFeedback": []
                        }, {
                            "kind": "video",
                            "mimeType": "video/VP9",
                            "clockRate": 90000,
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "parameters": {
                                "profile-id": 2,
                                "x-google-start-bitrate": 1000
                            },
                            "preferredPayloadType": 107
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 108,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 107
                            },
                            "rtcpFeedback": []
                        }],
                        "headerExtensions": [{
                            "kind": "audio",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
                            "preferredId": 1,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
                            "preferredId": 1,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
                            "preferredId": 2,
                            "preferredEncrypt": false,
                            "direction": "recvonly"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
                            "preferredId": 3,
                            "preferredEncrypt": false,
                            "direction": "recvonly"
                        }, {
                            "kind": "audio",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
                            "preferredId": 4,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
                            "preferredId": 4,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "audio",
                            "uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
                            "preferredId": 5,
                            "preferredEncrypt": false,
                            "direction": "recvonly"
                        }, {
                            "kind": "video",
                            "uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
                            "preferredId": 5,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "http://tools.ietf.org/html/draft-ietf-avtext-framemarking-07",
                            "preferredId": 6,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:framemarking",
                            "preferredId": 7,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "audio",
                            "uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
                            "preferredId": 10,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:3gpp:video-orientation",
                            "preferredId": 11,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:toffset",
                            "preferredId": 12,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "audio",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
                            "preferredId": 13,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
                            "preferredId": 13,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }]
                    }
                    """
            )
        }, (Object ... plainArgs) -> {
            final JSONObject transport = (JSONObject) plainArgs[0];
            String ip;
            String port;
            String rtcpPort;
            String transportId;
            try {
                ip          = transport.getString("ip");
                port        = transport.getString("port");
                rtcpPort    = transport.getString("rtcpPort");
                transportId = transport.getString("transportId");
            } catch (JSONException e) {
                log.error("解析响应异常:{}", transport, e);
                return;
            }
            socket.emit("client::media::produce", new Object[] {
                Map.of(
                    "kind",          "audio",
                    "clientId",      clientId,
                    "transportId",   transportId,
                    "appData",       Map.of(),
                    "rtpParameters", Map.of(
                        "codecs", List.of(Map.of(
                            "mimeType",     "audio/opus",
                            "channels",     2,
                            "clockRate",    48000,
                            "payloadType",  100
                            /*
                            "parameters",   Map.of(
                                "usedtx",       1,
                                "minptime",     10,
                                "useinbandfec", 1,
                                "sprop-stereo", 1
                            )
                            */
                        )),
                        "encodings", List.of(Map.of(
                            /*
                            "dtx",  false,
                            */
                            "ssrc", ssrc
                        ))
                    )
                )
            }, (Object ... produceArgs) -> {
                final JSONObject json = (JSONObject) produceArgs[0];
                log.info("生产音频媒体:{} - {}", ip, json.toString());
                this.rtpServer = new RtpServer(ip);
                this.rtpServer.putAudio(ssrc, port, rtcpPort);
            });
        });
    }
    
    /**
     * 发布视频
     * 
     * @param ssrc     SSRC
     * @param clientId ClientID
     * @param socket   Socket
     */
    public void publishVideo(int ssrc, String clientId, Socket socket) {
        socket.emit("client::transport::plain", new Object[] {
            Map.of(
                "clientId",        clientId,
                "producing",       true,
                "consuming",       false,
                "rtpCapabilities", """
                    {
                        "codecs": [{
                            "kind": "audio",
                            "mimeType": "audio/PCMU",
                            "preferredPayloadType": 0,
                            "clockRate": 8000,
                            "rtcpFeedback": [{
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "channels": 1,
                            "parameters": {}
                        }, {
                            "kind": "audio",
                            "mimeType": "audio/PCMA",
                            "preferredPayloadType": 8,
                            "clockRate": 8000,
                            "rtcpFeedback": [{
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "channels": 1,
                            "parameters": {}
                        }, {
                            "kind": "audio",
                            "mimeType": "audio/opus",
                            "clockRate": 48000,
                            "channels": 2,
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "parameters": {},
                            "preferredPayloadType": 100
                        }, {
                            "kind": "video",
                            "mimeType": "video/H264",
                            "clockRate": 90000,
                            "parameters": {
                                "level-asymmetry-allowed": 1,
                                "packetization-mode": 1,
                                "profile-level-id": "4d0032",
                                "x-google-start-bitrate": 1000
                            },
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "preferredPayloadType": 101
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 102,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 101
                            },
                            "rtcpFeedback": []
                        }, {
                            "kind": "video",
                            "mimeType": "video/H264",
                            "clockRate": 90000,
                            "parameters": {
                                "level-asymmetry-allowed": 1,
                                "packetization-mode": 1,
                                "profile-level-id": "42e01f",
                                "x-google-start-bitrate": 1000
                            },
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "preferredPayloadType": 103
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 104,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 103
                            },
                            "rtcpFeedback": []
                        }, {
                            "kind": "video",
                            "mimeType": "video/VP8",
                            "clockRate": 90000,
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "parameters": {
                                "x-google-start-bitrate": 1000
                            },
                            "preferredPayloadType": 105
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 106,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 105
                            },
                            "rtcpFeedback": []
                        }, {
                            "kind": "video",
                            "mimeType": "video/VP9",
                            "clockRate": 90000,
                            "rtcpFeedback": [{
                                "type": "nack",
                                "parameter": ""
                            }, {
                                "type": "nack",
                                "parameter": "pli"
                            }, {
                                "type": "ccm",
                                "parameter": "fir"
                            }, {
                                "type": "goog-remb",
                                "parameter": ""
                            }, {
                                "type": "transport-cc",
                                "parameter": ""
                            }],
                            "parameters": {
                                "profile-id": 2,
                                "x-google-start-bitrate": 1000
                            },
                            "preferredPayloadType": 107
                        }, {
                            "kind": "video",
                            "mimeType": "video/rtx",
                            "preferredPayloadType": 108,
                            "clockRate": 90000,
                            "parameters": {
                                "apt": 107
                            },
                            "rtcpFeedback": []
                        }],
                        "headerExtensions": [{
                            "kind": "audio",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
                            "preferredId": 1,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:mid",
                            "preferredId": 1,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
                            "preferredId": 2,
                            "preferredEncrypt": false,
                            "direction": "recvonly"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
                            "preferredId": 3,
                            "preferredEncrypt": false,
                            "direction": "recvonly"
                        }, {
                            "kind": "audio",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
                            "preferredId": 4,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
                            "preferredId": 4,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "audio",
                            "uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
                            "preferredId": 5,
                            "preferredEncrypt": false,
                            "direction": "recvonly"
                        }, {
                            "kind": "video",
                            "uri": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
                            "preferredId": 5,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "http://tools.ietf.org/html/draft-ietf-avtext-framemarking-07",
                            "preferredId": 6,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:framemarking",
                            "preferredId": 7,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "audio",
                            "uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
                            "preferredId": 10,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:3gpp:video-orientation",
                            "preferredId": 11,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "urn:ietf:params:rtp-hdrext:toffset",
                            "preferredId": 12,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "audio",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
                            "preferredId": 13,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }, {
                            "kind": "video",
                            "uri": "http://www.webrtc.org/experiments/rtp-hdrext/abs-capture-time",
                            "preferredId": 13,
                            "preferredEncrypt": false,
                            "direction": "sendrecv"
                        }]
                    }
                    """
            )
        }, (Object ... plainArgs) -> {
            final JSONObject transport = (JSONObject) plainArgs[0];
            String ip;
            String port;
            String rtcpPort;
            String transportId;
            try {
                ip          = transport.getString("ip");
                port        = transport.getString("port");
                rtcpPort    = transport.getString("rtcpPort");
                transportId = transport.getString("transportId");
            } catch (JSONException e) {
                log.error("解析响应异常:{}", transport, e);
                return;
            }
            socket.emit("client::media::produce", new Object[] {
                Map.of(
                    "kind",          "video",
                    "clientId",      clientId,
                    "transportId",   transportId,
                    "appData",       Map.of(),
                    "rtpParameters", Map.of(
                        "codecs", List.of(Map.of(
                            "mimeType",     "video/h264",
                            "clockRate",    90000,
                            "payloadType",  101,
                            "rtcpFeedback", List.of(),
                            "parameters",   Map.of(
                                "packetization-mode",      1,
                                "profile-level-id",        "4d0032",
                                "level-asymmetry-allowed", 1
                            )
                        )),
                        "encodings", List.of(Map.of(
                            "ssrc", ssrc
                        ))
                    )
                )
            }, (Object ... produceArgs) -> {
                final JSONObject json = (JSONObject) produceArgs[0];
                log.info("生产视频媒体:{} - {}", ip, json.toString());
                this.rtpServer.putVideo(ssrc, port, rtcpPort);
                this.clients.add(this.rtpServer);
                this.rtpServer = null;
            });
        });
    }

    /**
     * 执行命令
     * 
     * @param command 命令
     */
    public void executeCommand(String command) {
        if(this.ffmpegProcess != null) {
            log.info("已经开始执行命令");
            return;
        }
        try {
            ProcessBuilder processBuilder = null;
            final boolean linux = File.separatorChar == '/';
            if(linux) {
                processBuilder = new ProcessBuilder("/bin/bash", "-c", command);
                this.ffmpegProcess = processBuilder.start();
            } else {
                processBuilder = new ProcessBuilder("cmd", "/c", command);
                this.ffmpegProcess = processBuilder.start();
            }
            final InputStream inputStream = this.ffmpegProcess.getInputStream();
            final InputStream errorStream = this.ffmpegProcess.getErrorStream();
            final StringBuffer input = new StringBuffer();
            final StringBuffer error = new StringBuffer();
            final Thread inputThread = new Thread(() -> {
                try {
                    input.append(linux ? new String(inputStream.readAllBytes()) : new String(inputStream.readAllBytes(), "GBK"));
                } catch (Exception e) {
                    log.error("结果读取异常", e);
                }
            });
            inputThread.setDaemon(true);
            inputThread.start();
            final Thread errorThread = new Thread(() -> {
                try {
                    error.append(linux ? new String(errorStream.readAllBytes()) : new String(errorStream.readAllBytes(), "GBK"));
                } catch (Exception e) {
                    log.error("结果读取异常", e);
                }
            });
            errorThread.setDaemon(true);
            errorThread.start();
            this.ffmpegProcess.waitFor();
            inputStream.close();
            errorStream.close();
            log.info("命令执行完成:{}", input);
            log.info("命令执行完成:{}", error);
        } catch (Exception e) {
            log.error("执行命令异常:{}", command, e);
        }
    }
    
    /**
     * 广播媒体
     * 
     * @param video 视频地址
     * @param fps   帧数
     * @param crf   质量
     */
    public void broadcast(String video, int fps, int crf) {
        if(this.broadcast) {
            log.info("已经开始广播");
            return;
        }
        this.broadcast = true;
        final Thread broadcastThread = new Thread(() -> {
            try {
                final int port = 9898;
                final String host = InetAddress.getLocalHost().getHostAddress();
                this.ffmpegSocket = new DatagramSocket(port);
                // 提送本机
                final String command = String.format("""
                    ffmpeg -re -stream_loop -1 -i %s -map 0:v:0 -c:v h264 -r %d -crf %d -map 0:a:0 -c:a libopus -ab 96k -ac 2 -ar 48000 -f tee "[select=v:f=rtp:ssrc=%d:payload_type=%d]rtp://%s:%d?rtcpport=%d|[select=a:f=rtp:ssrc=%d:payload_type=%d]rtp://%s:%d?rtcpport=%d"
                    """,
                    video, fps, crf,
                    10000, 101, host, port, port,
                    10001, 100, host, port, port
                );
                log.info("执行命令:{}", command);
                final Thread commandThread = new Thread(() -> {
                    this.executeCommand(command);
                });
                commandThread.setDaemon(true);
                commandThread.start();
                final DatagramPacket packet = new DatagramPacket(new byte[8096], 8096);
                // 开始转发
                while(this.broadcast && !this.ffmpegSocket.isClosed()) {
                    this.ffmpegSocket.receive(packet);
                    final byte[] data = packet.getData();
                    final ByteBuffer buffer = ByteBuffer.allocate(4);
                    buffer.put(data, 8, 4);
                    buffer.flip();
                    final int ssrc = buffer.getInt();
                    this.clients.forEach(server -> {
                        try {
                            System.arraycopy(ssrc == 10000 ? server.videoSsrcBytes : server.audioSsrcBytes, 0, data, 8, 4);
                            final DatagramPacket datagramPacket = new DatagramPacket(
                                data,
                                packet.getOffset(),
                                packet.getLength(),
                                new InetSocketAddress(server.ip, ssrc == 10000 ? server.videoPort : server.audioPort)
                            );
                            this.ffmpegSocket.send(datagramPacket);
                        } catch (Exception e) {
                            log.error("转发异常", e);
                        }
                    });
                }
            } catch (Exception e) {
                log.error("转发异常", e);
            }
        });
        broadcastThread.setDaemon(true);
        broadcastThread.start();
    }
    
    /**
     * 关闭资源
     */
    public void close() {
        this.broadcast = false;
        if(this.ffmpegProcess != null) {
            final OutputStream outputStream = this.ffmpegProcess.getOutputStream();
            try {
                outputStream.write('q');
            } catch (IOException e) {
                log.error("推出线程异常", e);
            }
            this.ffmpegProcess.children().forEach(ProcessHandle::destroyForcibly);
            this.ffmpegProcess.descendants().forEach(ProcessHandle::destroyForcibly);
            this.ffmpegProcess.destroyForcibly();
            this.ffmpegProcess = null;
        }
        this.sockets.forEach(Socket::disconnect);
        this.sockets.clear();
        this.clients.clear();
        if(this.ffmpegSocket != null) {
            this.ffmpegSocket.close();
            this.ffmpegSocket = null;
        }
    }
    
    static class RtpServer {
        
        public String ip;
        public int    audioSsrc;
        public int    audioPort;
        public int    audioRtcpPort;
        public byte[] audioSsrcBytes;
        public int    videoSsrc;
        public int    videoPort;
        public int    videoRtcpPort;
        public byte[] videoSsrcBytes;
        
        public RtpServer(String ip) {
            this.ip = ip;
        }
        
        public void putAudio(int ssrc, String port, String rtcpPort) {
            this.audioSsrc = ssrc;
            this.audioPort = Integer.parseInt(port);
            this.audioRtcpPort = Integer.parseInt(rtcpPort);
            final ByteBuffer buffer = ByteBuffer.allocate(4);
            buffer.putInt(ssrc);
            this.audioSsrcBytes = buffer.array();
        }
        
        public void putVideo(int ssrc, String port, String rtcpPort) {
            this.videoSsrc = ssrc;
            this.videoPort = Integer.parseInt(port);
            this.videoRtcpPort = Integer.parseInt(rtcpPort);
            final ByteBuffer buffer = ByteBuffer.allocate(4);
            buffer.putInt(ssrc);
            this.videoSsrcBytes = buffer.array();
        }
        
    }
    
}