Skip to content

Can you help me analyze where the problem lies,about WHEP #168

Open
@liusuyi2021

Description

@liusuyi2021

I use the WHEP protocol to pull streams from streaming media. The connection has been established and the streaming media has started to push streams. I can see the bandwidth received through the network card, but I cannot get the stream on [OnAddStream]

package com.lsy.work.component.webrtc;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import dev.onvoid.webrtc.*;
import dev.onvoid.webrtc.media.MediaDevices;
import dev.onvoid.webrtc.media.MediaStream;
import dev.onvoid.webrtc.media.MediaStreamTrack;
import dev.onvoid.webrtc.media.audio.AudioDeviceModule;
import dev.onvoid.webrtc.media.audio.AudioOptions;
import dev.onvoid.webrtc.media.audio.AudioTrack;
import dev.onvoid.webrtc.media.audio.AudioTrackSource;
import dev.onvoid.webrtc.media.video.VideoDeviceSource;
import dev.onvoid.webrtc.media.video.VideoTrack;
import dev.onvoid.webrtc.media.video.VideoTrackSink;
import dev.onvoid.webrtc.media.video.VideoTrackSource;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.ConnectionPool;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class WHEPClient {
    
    private URL whepUrl;
    FrameGrabber frameGrabber;
    private RTCPeerConnection peerConnection;
    private String eTag = "";
    private List<RTCIceCandidate> queuedCandidates = new ArrayList<>();
    private OkHttpClient httpClient;
    private OfferData offerData;
    private PeerConnectionFactory peerConnectionFactory;

    @PostConstruct
    public void WHEPClientInit() {
        // 初始化一次性的组件
        this.httpClient = new OkHttpClient.Builder()
                .connectionPool(new ConnectionPool(5, 5, TimeUnit.MINUTES))
                .build();
        this.peerConnectionFactory = initializePeerConnectionFactory();
        start("http://192.168.1.2:8889/62218849d69b4146b29437e16f310231_1");
    }

    public void start(String url) {
        log.warn("Requesting ICE servers");
        reset();
        try {
            this.whepUrl = new URL(url + "/whep");
        } catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
        Request request = new Request.Builder()
                .url(this.whepUrl)
                .method("OPTIONS", null)
                .build();

        httpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                log.error("Failed to fetch ICE servers: " + e.getMessage());
                scheduleRestart();
            }

            @Override
            public void onResponse(Call call, Response response) {
                if (response.isSuccessful()) {
                    String linkHeader = response.header("Link");
                    List<RTCIceServer> iceServers = parseIceServers(linkHeader);
                    initializePeerConnection(iceServers);
                } else {
                    log.error("Failed to fetch ICE servers: " + response.message());
                    scheduleRestart();
                }
            }
        });
    }

    // 重置连接状态(切换URL时调用)
    private void reset() {
        if (peerConnection != null) {
            peerConnection.close();
            peerConnection = null;
        }
        eTag = "";
        queuedCandidates.clear();
    }

    private List<RTCIceServer> parseIceServers(String linkHeader) {
        List<RTCIceServer> iceServers = new ArrayList<>();
        if (linkHeader != null) {
            for (String link : linkHeader.split(", ")) {
                // 解析 ICE 服务器信息
                if (link.matches("<.+?>; rel=\"ice-server\".*")) {
                    String url = link.replaceAll("<(.+?)>.*", "$1");
                    String username = null;
                    String credential = null;

                    if (link.contains("username=")) {
                        username = link.replaceAll(".*username=\"(.+?)\".*", "$1");
                        credential = link.replaceAll(".*credential=\"(.+?)\".*", "$1");
                    }

                    RTCIceServer iceServer = new RTCIceServer();
                    iceServer.urls = Arrays.asList(new String[]{url});
                    iceServer.username = username;
                    iceServer.password = credential;
                    iceServers.add(iceServer);
                }
            }
        }
        return iceServers;
    }

    private PeerConnectionFactory initializePeerConnectionFactory() {
        // 创建工厂实例
        return new PeerConnectionFactory();
    }

    // 创建 PeerConnection
    private void initializePeerConnection(List<RTCIceServer> iceServers) {
        RTCConfiguration config = new RTCConfiguration();
        config.iceServers = iceServers;
        config.iceTransportPolicy = RTCIceTransportPolicy.ALL;

        peerConnection = peerConnectionFactory.createPeerConnection(config, new PeerConnectionObserver() {

            @Override
            public void onIceCandidate(RTCIceCandidate rtcIceCandidate) {
                log.warn("ICE Candidate: " + rtcIceCandidate.sdp);
                onLocalCandidate(rtcIceCandidate);
            }
            @Override
            public void onAddStream(MediaStream mediaStream) {
                if (mediaStream.getAudioTracks().length > 0) {
                    VideoTrack videoTrack = mediaStream.getVideoTracks()[0];
                    if (frameGrabber != null) {
                        videoTrack.addSink(frameGrabber); // 关键:触发 onFrame 回调
                    }
                }
            }
            @Override
            public void onTrack(RTCRtpTransceiver transceiver) {
                MediaStreamTrack track = transceiver.getReceiver().getTrack();
                if (track instanceof VideoTrack) {
                    // 设置到你的渲染器
                    FrameGrabber frameGrabber = new FrameGrabber();
                    ((VideoTrack) track).addSink(frameGrabber);
                }
            }


        });

        // 创建虚拟视频源和轨道(不会实际发送数据)
        AudioOptions audioOptions = new AudioOptions();
        AudioTrackSource dummySource = peerConnectionFactory.createAudioSource(audioOptions);
        AudioTrack audioTrack = peerConnectionFactory.createAudioTrack("audio", dummySource);

        // 配置为只接收模式
        RTCRtpTransceiverInit init = new RTCRtpTransceiverInit();
        init.direction = RTCRtpTransceiverDirection.RECV_ONLY;

        VideoDeviceSource videoSource = new VideoDeviceSource();
        VideoTrack videoTrack = peerConnectionFactory.createVideoTrack("video", videoSource);

        // 添加transceiver
        peerConnection.addTransceiver(audioTrack, init);
        peerConnection.addTransceiver(videoTrack, init);

        // 创建 SDP Offer
        createLocalOffer();
    }

    private void createLocalOffer() {
        RTCOfferOptions rtcOfferOptions = new RTCOfferOptions();
        peerConnection.createOffer(rtcOfferOptions, new CreateSessionDescriptionObserver() {
            @Override
            public void onSuccess(RTCSessionDescription rtcSessionDescription) {
                offerData = parseOffer(rtcSessionDescription.sdp);
                peerConnection.setLocalDescription(rtcSessionDescription, new SetSessionDescriptionObserver() {
                    @Override
                    public void onSuccess() {
                        sendLocalOffer(rtcSessionDescription.sdp);
                    }

                    @Override
                    public void onFailure(String error) {
                        log.error("Failed to set local description: " + error);
                        scheduleRestart();
                    }
                });

            }

            @Override
            public void onFailure(String error) {
                log.error("Failed to create offer: " + error);
                scheduleRestart();
            }
        });
    }

    private void sendLocalOffer(String sdpOffer) {
        RequestBody body = RequestBody.create(MediaType.parse("application/sdp"), sdpOffer);
        Request request = new Request.Builder()
                .url(whepUrl)
                .post(body)
                .build();

        httpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                log.error("Failed to send offer: " + e.getMessage());
                scheduleRestart();
            }

            @Override
            public void onResponse(Call call, Response response) {
                if (response.isSuccessful()) {
                    eTag = response.header("ETag");
                    String location = response.header("Location");
                    if (location != null) {
                        try {
                            String s = "http://" + whepUrl.getAuthority();
                            whepUrl = new URL(s + location);
                        } catch (Exception e) {
                            log.error("Invalid Location header: " + e.getMessage());
                            scheduleRestart();
                            return;
                        }
                    }
                    try {
                        String sdpAnswer = response.body().string();
                        RTCSessionDescription answer = new RTCSessionDescription(RTCSdpType.ANSWER, sdpAnswer);
                        onRemoteAnswer(answer);
                    } catch (IOException e) {
                        log.error("Failed to read response body: " + e.getMessage());
                        scheduleRestart();
                    }
                } else {
                    log.error("Failed to send offer: " + response.message());
                    scheduleRestart();
                }
            }
        });
    }

    private void onRemoteAnswer(RTCSessionDescription rtcSessionDescriptio) {
        peerConnection.setRemoteDescription(rtcSessionDescriptio, new SetSessionDescriptionObserver() {
            @Override
            public void onSuccess() {
                log.warn("Remote answer set successfully");
                if (!queuedCandidates.isEmpty()) {
                    sendLocalCandidates(queuedCandidates);
                    queuedCandidates.clear();
                }
            }

            @Override
            public void onFailure(String error) {
                log.error("Failed to set remote answer: " + error);
                scheduleRestart();
            }

        });
    }

    private void onLocalCandidate(RTCIceCandidate candidate) {
        if (eTag.isEmpty()) {
            queuedCandidates.add(candidate);
        } else {
            List<RTCIceCandidate> list = new ArrayList<>();
            list.add(candidate);
            sendLocalCandidates(list);
        }
    }

    private void sendLocalCandidates(List<RTCIceCandidate> candidates) {
        String sdpFragment = generateSdpFragment(offerData, candidates);
        RequestBody body = RequestBody.create(MediaType.parse("application/trickle-ice-sdpfrag"), sdpFragment);
        Request request = new Request.Builder()
                .url(whepUrl)
                .patch(body)
                .header("If-Match", eTag)
                .build();

        httpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                log.error("Failed to send candidates: " + e.getMessage());
                scheduleRestart();
            }

            @Override
            public void onResponse(Call call, Response response) {
                if (!response.isSuccessful()) {
                    log.error("Failed to send candidates: " + response.message());
                    // scheduleRestart();
                }
            }
        });
    }
    

    private void scheduleRestart() {
        if (peerConnection != null) {
            peerConnection.close();
        }
    }

    public void stop() {
        if (peerConnection != null) {
            peerConnection.close();
        }
    }

    private static class OfferData {
        String iceUfrag;
        String icePwd;
        List<String> medias = new ArrayList<>();
    }

    private OfferData parseOffer(String sdp) {
        OfferData data = new OfferData();
        for (String line : sdp.split("\r\n")) {
            if (line.startsWith("a=ice-ufrag:")) {
                data.iceUfrag = line.substring("a=ice-ufrag:".length());
            } else if (line.startsWith("a=ice-pwd:")) {
                data.icePwd = line.substring("a=ice-pwd:".length());
            } else if (line.startsWith("m=")) {
                data.medias.add(line.substring("m=".length()));
            }
        }
        return data;
    }

    private String generateSdpFragment(OfferData offerData, List<RTCIceCandidate> candidates) {
        StringBuilder sdp = new StringBuilder();
        sdp.append("a=ice-ufrag:").append(offerData.iceUfrag).append("\r\n")
                .append("a=ice-pwd:").append(offerData.icePwd).append("\r\n");

        for (int i = 0; i < offerData.medias.size(); i++) {
            sdp.append("m=").append(offerData.medias.get(i)).append("\r\n")
                    .append("a=mid:").append(i).append("\r\n");

            for (RTCIceCandidate candidate : candidates) {
                if (candidate.sdpMLineIndex == i) {
                    sdp.append("a=").append(candidate.sdp).append("\r\n");
                }
            }
        }
        return sdp.toString();
    }
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions