WebSocketServer.gd 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. extends Node
  2. class_name WebSocketServer
  3. signal message_received(peer_id: int, message)
  4. signal client_connected(peer_id: int)
  5. signal client_disconnected(peer_id: int)
  6. @export var handshake_headers := PackedStringArray()
  7. @export var supported_protocols: PackedStringArray
  8. @export var handshake_timout := 3000
  9. @export var use_tls := false
  10. @export var tls_cert: X509Certificate
  11. @export var tls_key: CryptoKey
  12. @export var refuse_new_connections := false:
  13. set(refuse):
  14. if refuse:
  15. pending_peers.clear()
  16. class PendingPeer:
  17. var connect_time: int
  18. var tcp: StreamPeerTCP
  19. var connection: StreamPeer
  20. var ws: WebSocketPeer
  21. func _init(p_tcp: StreamPeerTCP):
  22. tcp = p_tcp
  23. connection = p_tcp
  24. connect_time = Time.get_ticks_msec()
  25. var tcp_server := TCPServer.new()
  26. var pending_peers: Array[PendingPeer] = []
  27. var peers: Dictionary
  28. func listen(port: int) -> int:
  29. assert(not tcp_server.is_listening())
  30. return tcp_server.listen(port)
  31. func stop():
  32. tcp_server.stop()
  33. pending_peers.clear()
  34. peers.clear()
  35. func send(peer_id, message) -> int:
  36. var type = typeof(message)
  37. if peer_id <= 0:
  38. # Send to multiple peers, (zero = brodcast, negative = exclude one)
  39. for id in peers:
  40. if id == -peer_id:
  41. continue
  42. if type == TYPE_STRING:
  43. peers[id].send_text(message)
  44. else:
  45. peers[id].put_packet(message)
  46. return OK
  47. assert(peers.has(peer_id))
  48. var socket = peers[peer_id]
  49. if type == TYPE_STRING:
  50. return socket.send_text(message)
  51. return socket.send(var_to_bytes(message))
  52. func get_message(peer_id) -> Variant:
  53. assert(peers.has(peer_id))
  54. var socket = peers[peer_id]
  55. if socket.get_available_packet_count() < 1:
  56. return null
  57. var pkt = socket.get_packet()
  58. if socket.was_string_packet():
  59. return pkt.get_string_from_utf8()
  60. return bytes_to_var(pkt)
  61. func has_message(peer_id) -> bool:
  62. assert(peers.has(peer_id))
  63. return peers[peer_id].get_available_packet_count() > 0
  64. func _create_peer() -> WebSocketPeer:
  65. var ws = WebSocketPeer.new()
  66. ws.supported_protocols = supported_protocols
  67. ws.handshake_headers = handshake_headers
  68. return ws
  69. func poll() -> void:
  70. if not tcp_server.is_listening():
  71. return
  72. while not refuse_new_connections and tcp_server.is_connection_available():
  73. var conn = tcp_server.take_connection()
  74. assert(conn != null)
  75. pending_peers.append(PendingPeer.new(conn))
  76. var to_remove := []
  77. for p in pending_peers:
  78. if not _connect_pending(p):
  79. if p.connect_time + handshake_timout < Time.get_ticks_msec():
  80. # Timeout
  81. to_remove.append(p)
  82. continue # Still pending
  83. to_remove.append(p)
  84. for r in to_remove:
  85. pending_peers.erase(r)
  86. to_remove.clear()
  87. for id in peers:
  88. var p: WebSocketPeer = peers[id]
  89. var packets = p.get_available_packet_count()
  90. p.poll()
  91. if p.get_ready_state() != WebSocketPeer.STATE_OPEN:
  92. client_disconnected.emit(id)
  93. to_remove.append(id)
  94. continue
  95. while p.get_available_packet_count():
  96. message_received.emit(id, get_message(id))
  97. for r in to_remove:
  98. peers.erase(r)
  99. to_remove.clear()
  100. func _connect_pending(p: PendingPeer) -> bool:
  101. if p.ws != null:
  102. # Poll websocket client if doing handshake
  103. p.ws.poll()
  104. var state = p.ws.get_ready_state()
  105. if state == WebSocketPeer.STATE_OPEN:
  106. var id = randi_range(2, 1 << 30)
  107. peers[id] = p.ws
  108. client_connected.emit(id)
  109. return true # Success.
  110. elif state != WebSocketPeer.STATE_CONNECTING:
  111. return true # Failure.
  112. return false # Still connecting.
  113. elif p.tcp.get_status() != StreamPeerTCP.STATUS_CONNECTED:
  114. return true # TCP disconnected.
  115. elif not use_tls:
  116. # TCP is ready, create WS peer
  117. p.ws = _create_peer()
  118. p.ws.accept_stream(p.tcp)
  119. return false # WebSocketPeer connection is pending.
  120. else:
  121. if p.connection == p.tcp:
  122. assert(tls_key != null and tls_cert != null)
  123. var tls = StreamPeerTLS.new()
  124. tls.accept_stream(p.tcp, TLSOptions.server(tls_key, tls_cert))
  125. p.connection = tls
  126. p.connection.poll()
  127. var status = p.connection.get_status()
  128. if status == StreamPeerTLS.STATUS_CONNECTED:
  129. p.ws = _create_peer()
  130. p.ws.accept_stream(p.connection)
  131. return false # WebSocketPeer connection is pending.
  132. if status != StreamPeerTLS.STATUS_HANDSHAKING:
  133. return true # Failure.
  134. return false
  135. func _process(delta):
  136. poll()