Open3D (C++ API)  0.17.0
PeerConnectionManager.h
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - Open3D: www.open3d.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2023 www.open3d.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
7 // ----------------------------------------------------------------------------
8 // Contains source code from
9 // https://github.com/mpromonet/webrtc-streamer
10 //
11 // This software is in the public domain, furnished "as is", without technical
12 // support, and with no warranty, express or implied, as to its usefulness for
13 // any purpose.
14 // ----------------------------------------------------------------------------
15 //
16 // This is a private header. It shall be hidden from Open3D's public API. Do not
17 // put this in Open3D.h.in.
18 
19 #pragma once
20 
21 #include <api/peer_connection_interface.h>
22 #include <rtc_base/strings/json.h>
23 
24 #include <future>
25 #include <mutex>
26 #include <regex>
27 #include <string>
28 #include <thread>
29 #include <unordered_map>
30 
34 
35 namespace open3d {
36 namespace visualization {
37 namespace webrtc_server {
38 
74  class VideoSink : public rtc::VideoSinkInterface<webrtc::VideoFrame> {
75  public:
76  VideoSink(webrtc::VideoTrackInterface* track) : track_(track) {
77  track_->AddOrUpdateSink(this, rtc::VideoSinkWants());
78  }
79  virtual ~VideoSink() { track_->RemoveSink(this); }
80 
81  // VideoSinkInterface implementation
82  virtual void OnFrame(const webrtc::VideoFrame& video_frame) {
83  rtc::scoped_refptr<webrtc::I420BufferInterface> buffer(
84  video_frame.video_frame_buffer()->ToI420());
85  utility::LogDebug("[{}] frame: {}x{}", OPEN3D_FUNCTION,
86  buffer->height(), buffer->width());
87  }
88 
89  protected:
90  rtc::scoped_refptr<webrtc::VideoTrackInterface> track_;
91  };
92 
93  class SetSessionDescriptionObserver
94  : public webrtc::SetSessionDescriptionObserver {
95  public:
96  static SetSessionDescriptionObserver* Create(
97  webrtc::PeerConnectionInterface* pc,
98  std::promise<const webrtc::SessionDescriptionInterface*>&
99  promise) {
100  return new rtc::RefCountedObject<SetSessionDescriptionObserver>(
101  pc, promise);
102  }
103  virtual void OnSuccess() {
104  std::string sdp;
105  if (pc_->local_description()) {
106  promise_.set_value(pc_->local_description());
107  pc_->local_description()->ToString(&sdp);
108  } else if (pc_->remote_description()) {
109  promise_.set_value(pc_->remote_description());
110  pc_->remote_description()->ToString(&sdp);
111  }
112  }
113  virtual void OnFailure(webrtc::RTCError error) {
114  utility::LogWarning("{}", error.message());
115  promise_.set_value(nullptr);
116  }
117 
118  protected:
119  SetSessionDescriptionObserver(
120  webrtc::PeerConnectionInterface* pc,
121  std::promise<const webrtc::SessionDescriptionInterface*>&
122  promise)
123  : pc_(pc), promise_(promise){};
124 
125  private:
126  webrtc::PeerConnectionInterface* pc_;
127  std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
128  };
129 
130  class CreateSessionDescriptionObserver
131  : public webrtc::CreateSessionDescriptionObserver {
132  public:
133  static CreateSessionDescriptionObserver* Create(
134  webrtc::PeerConnectionInterface* pc,
135  std::promise<const webrtc::SessionDescriptionInterface*>&
136  promise) {
137  return new rtc::RefCountedObject<CreateSessionDescriptionObserver>(
138  pc, promise);
139  }
140  virtual void OnSuccess(webrtc::SessionDescriptionInterface* desc) {
141  std::string sdp;
142  desc->ToString(&sdp);
143  pc_->SetLocalDescription(
144  SetSessionDescriptionObserver::Create(pc_, promise_), desc);
145  }
146  virtual void OnFailure(webrtc::RTCError error) {
147  utility::LogWarning("{}", error.message());
148  promise_.set_value(nullptr);
149  }
150 
151  protected:
152  CreateSessionDescriptionObserver(
153  webrtc::PeerConnectionInterface* pc,
154  std::promise<const webrtc::SessionDescriptionInterface*>&
155  promise)
156  : pc_(pc), promise_(promise){};
157 
158  private:
159  webrtc::PeerConnectionInterface* pc_;
160  std::promise<const webrtc::SessionDescriptionInterface*>& promise_;
161  };
162 
163  class PeerConnectionStatsCollectorCallback
164  : public webrtc::RTCStatsCollectorCallback {
165  public:
166  PeerConnectionStatsCollectorCallback() {}
167  void clearReport() { report_.clear(); }
168  Json::Value getReport() { return report_; }
169 
170  protected:
171  virtual void OnStatsDelivered(
172  const rtc::scoped_refptr<const webrtc::RTCStatsReport>&
173  report) {
174  for (const webrtc::RTCStats& stats : *report) {
175  Json::Value stats_members;
176  for (const webrtc::RTCStatsMemberInterface* member :
177  stats.Members()) {
178  stats_members[member->name()] = member->ValueToString();
179  }
180  report_[stats.id()] = stats_members;
181  }
182  }
183 
184  Json::Value report_;
185  };
186 
187  class DataChannelObserver : public webrtc::DataChannelObserver {
188  public:
189  DataChannelObserver(
190  PeerConnectionManager* peer_connection_manager,
191  rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
192  const std::string& peerid)
193  : peer_connection_manager_(peer_connection_manager),
194  data_channel_(data_channel),
195  peerid_(peerid) {
196  data_channel_->RegisterObserver(this);
197  }
198  virtual ~DataChannelObserver() { data_channel_->UnregisterObserver(); }
199 
200  // DataChannelObserver interface
201  virtual void OnStateChange() {
202  // Useful to know when the data channel is established.
203  const std::string label = data_channel_->label();
204  const std::string state =
205  webrtc::DataChannelInterface::DataStateString(
206  data_channel_->state());
208  "DataChannelObserver::OnStateChange label: {}, state: {}, "
209  "peerid: {}",
210  label, state, peerid_);
211  std::string msg(label + " " + state);
212  webrtc::DataBuffer buffer(msg);
213  data_channel_->Send(buffer);
214  // ClientDataChannel is established after ServerDataChannel. Once
215  // ClientDataChannel is established, we need to send initial frames
216  // to the client such that the video is not empty. Afterwards,
217  // video frames will only be sent when the GUI redraws.
218  if (label == "ClientDataChannel" && state == "open") {
219  {
220  std::lock_guard<std::mutex> mutex_lock(
221  peer_connection_manager_
223  peer_connection_manager_->peerid_data_channel_ready_.insert(
224  peerid_);
225  }
226  peer_connection_manager_->SendInitFramesToPeer(peerid_);
227  }
228  if (label == "ClientDataChannel" &&
229  (state == "closed" || state == "closing")) {
230  std::lock_guard<std::mutex> mutex_lock(
231  peer_connection_manager_->peerid_data_channel_mutex_);
232  peer_connection_manager_->peerid_data_channel_ready_.erase(
233  peerid_);
234  }
235  }
236  virtual void OnMessage(const webrtc::DataBuffer& buffer) {
237  std::string msg((const char*)buffer.data.data(),
238  buffer.data.size());
239  utility::LogDebug("DataChannelObserver::OnMessage: {}, msg: {}.",
240  data_channel_->label(), msg);
241  std::string reply =
242  WebRTCWindowSystem::GetInstance()->OnDataChannelMessage(
243  msg);
244  if (!reply.empty()) {
245  webrtc::DataBuffer buffer(reply);
246  data_channel_->Send(buffer);
247  }
248  }
249 
250  protected:
251  PeerConnectionManager* peer_connection_manager_;
252  rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
253  const std::string peerid_;
254  };
255 
256  class PeerConnectionObserver : public webrtc::PeerConnectionObserver {
257  public:
258  PeerConnectionObserver(
259  PeerConnectionManager* peer_connection_manager,
260  const std::string& peerid,
261  const webrtc::PeerConnectionInterface::RTCConfiguration& config,
262  std::unique_ptr<cricket::PortAllocator> port_allocator)
263  : peer_connection_manager_(peer_connection_manager),
264  peerid_(peerid),
265  local_channel_(nullptr),
266  remote_channel_(nullptr),
267  ice_candidate_list_(Json::arrayValue),
268  deleting_(false) {
269  pc_ = peer_connection_manager_->peer_connection_factory_
270  ->CreatePeerConnection(config,
271  std::move(port_allocator),
272  nullptr, this);
273 
274  if (pc_.get()) {
275  rtc::scoped_refptr<webrtc::DataChannelInterface> channel =
276  pc_->CreateDataChannel("ServerDataChannel", nullptr);
277  local_channel_ = new DataChannelObserver(
278  peer_connection_manager_, channel, peerid_);
279  }
280 
281  stats_callback_ = new rtc::RefCountedObject<
282  PeerConnectionStatsCollectorCallback>();
283  };
284 
285  virtual ~PeerConnectionObserver() {
286  delete local_channel_;
287  delete remote_channel_;
288  if (pc_.get()) {
289  // warning: pc->close call OnIceConnectionChange
290  deleting_ = true;
291  pc_->Close();
292  }
293  }
294 
295  Json::Value GetIceCandidateList() { return ice_candidate_list_; }
296 
297  Json::Value GetStats() {
298  stats_callback_->clearReport();
299  pc_->GetStats(stats_callback_);
300  int count = 10;
301  while ((stats_callback_->getReport().empty()) && (--count > 0)) {
302  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
303  }
304  return Json::Value(stats_callback_->getReport());
305  };
306 
307  rtc::scoped_refptr<webrtc::PeerConnectionInterface>
309  return pc_;
310  };
311 
312  // PeerConnectionObserver interface
313  virtual void OnAddStream(
314  rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
315  utility::LogDebug("[{}] GetVideoTracks().size(): {}.",
316  OPEN3D_FUNCTION, stream->GetVideoTracks().size());
317  webrtc::VideoTrackVector videoTracks = stream->GetVideoTracks();
318  if (videoTracks.size() > 0) {
319  video_sink_.reset(new VideoSink(videoTracks.at(0)));
320  }
321  }
322  virtual void OnRemoveStream(
323  rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
324  video_sink_.reset();
325  }
326  virtual void OnDataChannel(
327  rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
329  "PeerConnectionObserver::OnDataChannel peerid: {}",
330  peerid_);
331  remote_channel_ = new DataChannelObserver(peer_connection_manager_,
332  channel, peerid_);
333  }
334  virtual void OnRenegotiationNeeded() {
335  std::lock_guard<std::mutex> mutex_lock(
336  peer_connection_manager_->peerid_data_channel_mutex_);
337  peer_connection_manager_->peerid_data_channel_ready_.erase(peerid_);
339  "PeerConnectionObserver::OnRenegotiationNeeded peerid: {}",
340  peerid_);
341  }
342  virtual void OnIceCandidate(
343  const webrtc::IceCandidateInterface* candidate);
344 
345  virtual void OnSignalingChange(
346  webrtc::PeerConnectionInterface::SignalingState state) {
347  utility::LogDebug("state: {}, peerid: {}", state, peerid_);
348  }
349  virtual void OnIceConnectionChange(
350  webrtc::PeerConnectionInterface::IceConnectionState state) {
351  if ((state ==
352  webrtc::PeerConnectionInterface::kIceConnectionFailed) ||
353  (state ==
354  webrtc::PeerConnectionInterface::kIceConnectionClosed)) {
355  ice_candidate_list_.clear();
356  if (!deleting_) {
357  std::thread([this]() {
358  peer_connection_manager_->HangUp(peerid_);
359  }).detach();
360  }
361  }
362  }
363 
364  virtual void OnIceGatheringChange(
365  webrtc::PeerConnectionInterface::IceGatheringState) {}
366 
367  private:
368  PeerConnectionManager* peer_connection_manager_;
369  const std::string peerid_;
370  rtc::scoped_refptr<webrtc::PeerConnectionInterface> pc_;
371  DataChannelObserver* local_channel_;
372  DataChannelObserver* remote_channel_;
373  Json::Value ice_candidate_list_;
374  rtc::scoped_refptr<PeerConnectionStatsCollectorCallback>
375  stats_callback_;
376  std::unique_ptr<VideoSink> video_sink_;
377  bool deleting_;
378  };
379 
380 public:
381  PeerConnectionManager(const std::list<std::string>& ice_server_list,
382  const Json::Value& config,
383  const std::string& publish_filter,
384  const std::string& webrtc_udp_port_range);
385  virtual ~PeerConnectionManager();
386 
388  const std::map<std::string, HttpServerRequestHandler::HttpFunction>
389  GetHttpApi();
390 
391  const Json::Value GetIceCandidateList(const std::string& peerid);
392  const Json::Value AddIceCandidate(const std::string& peerid,
393  const Json::Value& json_message);
394  const Json::Value GetMediaList();
395  const Json::Value HangUp(const std::string& peerid);
396  const Json::Value Call(const std::string& peerid,
397  const std::string& window_uid,
398  const std::string& options,
399  const Json::Value& json_message);
400  const Json::Value GetIceServers();
401 
402  void SendInitFramesToPeer(const std::string& peerid);
403 
404  void CloseWindowConnections(const std::string& window_uid);
405 
406  void OnFrame(const std::string& window_uid,
407  const std::shared_ptr<core::Tensor>& im);
408 
409 protected:
410  rtc::scoped_refptr<BitmapTrackSourceInterface> GetVideoTrackSource(
411  const std::string& window_uid);
412  PeerConnectionObserver* CreatePeerConnection(const std::string& peerid);
413  bool AddStreams(webrtc::PeerConnectionInterface* peer_connection,
414  const std::string& window_uid,
415  const std::string& options);
416  rtc::scoped_refptr<BitmapTrackSourceInterface> CreateVideoSource(
417  const std::string& window_uid,
418  const std::map<std::string, std::string>& opts);
419  bool WindowStillUsed(const std::string& window_uid);
420  rtc::scoped_refptr<webrtc::PeerConnectionInterface> GetPeerConnection(
421  const std::string& peerid);
422 
423 protected:
424  std::unique_ptr<webrtc::TaskQueueFactory> task_queue_factory_;
425  rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
427 
428  // Each peer has exactly one connection.
429  std::unordered_map<std::string, PeerConnectionObserver*>
432  // Set of peerids with data channel ready for communication
433  std::unordered_set<std::string> peerid_data_channel_ready_;
435 
436  // Each Window has exactly one TrackSource.
437  std::unordered_map<std::string,
438  rtc::scoped_refptr<BitmapTrackSourceInterface>>
441 
442  // Each Window can be connected to zero, one or more peers.
443  std::unordered_map<std::string, std::set<std::string>>
445  std::unordered_map<std::string, std::string> peerid_to_window_uid_;
446  // Shared by window_uid_to_peerids_ and peerid_to_window_uid_.
448 
449  std::list<std::string> ice_server_list_;
450  const Json::Value config_;
451  const std::regex publish_filter_;
452  std::map<std::string, HttpServerRequestHandler::HttpFunction> func_;
453  std::string webrtc_port_range_;
454 };
455 
456 } // namespace webrtc_server
457 } // namespace visualization
458 } // namespace open3d
#define LogWarning(...)
Definition: Logging.h:60
#define LogInfo(...)
Definition: Logging.h:70
#define LogDebug(...)
Definition: Logging.h:79
#define OPEN3D_FUNCTION
Definition: Macro.h:40
PeerConnectionManager(const std::list< std::string > &ice_server_list, const Json::Value &config, const std::string &publish_filter, const std::string &webrtc_udp_port_range)
Definition: PeerConnectionManager.cpp:126
const Json::Value AddIceCandidate(const std::string &peerid, const Json::Value &json_message)
Definition: PeerConnectionManager.cpp:251
std::mutex peerid_data_channel_mutex_
Definition: PeerConnectionManager.h:434
PeerConnectionObserver * CreatePeerConnection(const std::string &peerid)
Definition: PeerConnectionManager.cpp:522
virtual ~PeerConnectionManager()
Definition: PeerConnectionManager.cpp:200
std::mutex window_uid_to_peerids_mutex_
Definition: PeerConnectionManager.h:447
rtc::scoped_refptr< BitmapTrackSourceInterface > CreateVideoSource(const std::string &window_uid, const std::map< std::string, std::string > &opts)
Definition: PeerConnectionManager.cpp:563
bool WindowStillUsed(const std::string &window_uid)
Definition: PeerConnectionManager.cpp:417
std::mutex peerid_to_connection_mutex_
Definition: PeerConnectionManager.h:431
const Json::Value Call(const std::string &peerid, const std::string &window_uid, const std::string &options, const Json::Value &json_message)
Definition: PeerConnectionManager.cpp:305
std::unique_ptr< webrtc::TaskQueueFactory > task_queue_factory_
Definition: PeerConnectionManager.h:424
std::unordered_map< std::string, std::set< std::string > > window_uid_to_peerids_
Definition: PeerConnectionManager.h:444
bool InitializePeerConnection()
Definition: PeerConnectionManager.cpp:516
void SendInitFramesToPeer(const std::string &peerid)
Definition: PeerConnectionManager.cpp:708
std::unordered_map< std::string, std::string > peerid_to_window_uid_
Definition: PeerConnectionManager.h:445
const Json::Value GetIceCandidateList(const std::string &peerid)
Definition: PeerConnectionManager.cpp:499
const Json::Value HangUp(const std::string &peerid)
Definition: PeerConnectionManager.cpp:435
const Json::Value GetMediaList()
Definition: PeerConnectionManager.cpp:203
void OnFrame(const std::string &window_uid, const std::shared_ptr< core::Tensor > &im)
Definition: PeerConnectionManager.cpp:732
std::list< std::string > ice_server_list_
Definition: PeerConnectionManager.h:449
const Json::Value GetIceServers()
Definition: PeerConnectionManager.cpp:217
const std::map< std::string, HttpServerRequestHandler::HttpFunction > GetHttpApi()
Definition: PeerConnectionManager.cpp:494
std::map< std::string, HttpServerRequestHandler::HttpFunction > func_
Definition: PeerConnectionManager.h:452
std::string webrtc_port_range_
Definition: PeerConnectionManager.h:453
std::unordered_map< std::string, PeerConnectionObserver * > peerid_to_connection_
Definition: PeerConnectionManager.h:430
void CloseWindowConnections(const std::string &window_uid)
Definition: PeerConnectionManager.cpp:714
rtc::scoped_refptr< webrtc::PeerConnectionInterface > GetPeerConnection(const std::string &peerid)
Definition: PeerConnectionManager.cpp:241
std::unordered_set< std::string > peerid_data_channel_ready_
Definition: PeerConnectionManager.h:433
const std::regex publish_filter_
Definition: PeerConnectionManager.h:451
rtc::scoped_refptr< BitmapTrackSourceInterface > GetVideoTrackSource(const std::string &window_uid)
Definition: PeerConnectionManager.cpp:696
bool AddStreams(webrtc::PeerConnectionInterface *peer_connection, const std::string &window_uid, const std::string &options)
Definition: PeerConnectionManager.cpp:575
std::mutex window_uid_to_track_source_mutex_
Definition: PeerConnectionManager.h:440
const Json::Value config_
Definition: PeerConnectionManager.h:450
std::unordered_map< std::string, rtc::scoped_refptr< BitmapTrackSourceInterface > > window_uid_to_track_source_
Definition: PeerConnectionManager.h:439
rtc::scoped_refptr< webrtc::PeerConnectionFactoryInterface > peer_connection_factory_
Definition: PeerConnectionManager.h:426
static std::shared_ptr< WebRTCWindowSystem > GetInstance()
Definition: WebRTCWindowSystem.cpp:111
int count
Definition: FilePCD.cpp:42
Definition: PinholeCameraIntrinsic.cpp:16