implement communication via broadcastchannel
This commit is contained in:
parent
c684411cf3
commit
a012dcac2b
5 changed files with 321 additions and 108 deletions
14
package-lock.json
generated
14
package-lock.json
generated
|
@ -2683,9 +2683,9 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"lib0": {
|
"lib0": {
|
||||||
"version": "0.1.6",
|
"version": "0.1.7",
|
||||||
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.1.6.tgz",
|
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.1.7.tgz",
|
||||||
"integrity": "sha512-MhOo53Ji256M/6x6ZcFmz/ySOXt9Jsn00QWw8RVdEBHqTaWAUYzIu5tvKh63IbjH7+Spv7em3vkn+zkKWHA/pA=="
|
"integrity": "sha512-ooa/CXJ76VMY3ZaE0lYNYkLp/OND1jobHHY6QUtSDxne+xb3+Gl6f8WPUXdBkgnPEv59s/+jCdN4HOumdI5rSg=="
|
||||||
},
|
},
|
||||||
"live-server": {
|
"live-server": {
|
||||||
"version": "1.2.1",
|
"version": "1.2.1",
|
||||||
|
@ -4750,12 +4750,12 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"yjs": {
|
"yjs": {
|
||||||
"version": "13.0.0-102",
|
"version": "13.0.0-103",
|
||||||
"resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-102.tgz",
|
"resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-103.tgz",
|
||||||
"integrity": "sha512-Uda/w8yIwX57CHXS9mmP3X9N6Q9DHVAXCEgZJG/c7e4oHtN/A6JwAsltr4fVph0p5PPdFHf7uAUVaM5XJ6ZeIg==",
|
"integrity": "sha512-CFAkbYcC336k8PIWxp8aaVx/dOw4jiY7AhdxZ8OfYmre3/3X2PbrBaOSh826W3gn/+HdXZpDJzReyHOb7RfdvQ==",
|
||||||
"dev": true,
|
"dev": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"lib0": "^0.1.1"
|
"lib0": "^0.1.7"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
"debug": "concurrently 'live-server --port=3443 --entry-file=test.html' 'npm run watch'",
|
"debug": "concurrently 'live-server --port=3443 --entry-file=test.html' 'npm run watch'",
|
||||||
"dist": "rm -rf dist && rollup -c",
|
"dist": "rm -rf dist && rollup -c",
|
||||||
"watch": "rollup -wc",
|
"watch": "rollup -wc",
|
||||||
"lint": "standard",
|
"lint": "standard && tsc",
|
||||||
"preversion": "npm run lint && npm run dist"
|
"preversion": "npm run lint && npm run dist"
|
||||||
},
|
},
|
||||||
"bin": {
|
"bin": {
|
||||||
|
@ -42,7 +42,7 @@
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"simple-peer": "^9.6.2",
|
"simple-peer": "^9.6.2",
|
||||||
"lib0": "^0.1.6",
|
"lib0": "^0.1.7",
|
||||||
"y-protocols": "^0.1.0"
|
"y-protocols": "^0.1.0"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
@ -54,10 +54,10 @@
|
||||||
"rollup-plugin-node-resolve": "^5.2.0",
|
"rollup-plugin-node-resolve": "^5.2.0",
|
||||||
"rollup-plugin-terser": "^5.1.3",
|
"rollup-plugin-terser": "^5.1.3",
|
||||||
"standard": "^12.0.1",
|
"standard": "^12.0.1",
|
||||||
"yjs": "^13.0.0-102"
|
"yjs": "^13.0.0-103"
|
||||||
},
|
},
|
||||||
"peerDependenies": {
|
"peerDependenies": {
|
||||||
"yjs": ">=13.0.0-102"
|
"yjs": ">=13.0.0-103"
|
||||||
},
|
},
|
||||||
"optionalDependencies": {
|
"optionalDependencies": {
|
||||||
"ws": "^7.2.0"
|
"ws": "^7.2.0"
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
import * as encoding from 'lib0/encoding.js'
|
import * as encoding from 'lib0/encoding.js'
|
||||||
import * as decoding from 'lib0/decoding.js'
|
import * as decoding from 'lib0/decoding.js'
|
||||||
import * as buffer from 'lib0/buffer.js'
|
|
||||||
import * as promise from 'lib0/promise.js'
|
import * as promise from 'lib0/promise.js'
|
||||||
import * as error from 'lib0/error.js'
|
import * as error from 'lib0/error.js'
|
||||||
import * as string from 'lib0/string.js'
|
import * as string from 'lib0/string.js'
|
||||||
|
@ -41,41 +40,52 @@ export const deriveKey = (secret, roomName) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {any} data A json object to be encrypted
|
* @param {Uint8Array} data data to be encrypted
|
||||||
* @param {CryptoKey} key
|
* @param {CryptoKey?} key
|
||||||
* @return {PromiseLike<string>} encrypted, base64 encoded message
|
* @return {PromiseLike<Uint8Array>} encrypted, base64 encoded message
|
||||||
*/
|
*/
|
||||||
export const encrypt = (data, key) => {
|
export const encrypt = (data, key) => {
|
||||||
|
if (!key) {
|
||||||
|
return /** @type {PromiseLike<Uint8Array>} */ (promise.resolve(data))
|
||||||
|
}
|
||||||
const iv = crypto.getRandomValues(new Uint8Array(12))
|
const iv = crypto.getRandomValues(new Uint8Array(12))
|
||||||
const dataEncoder = encoding.createEncoder()
|
|
||||||
encoding.writeAny(dataEncoder, data)
|
|
||||||
const dataBuffer = encoding.toUint8Array(dataEncoder)
|
|
||||||
return crypto.subtle.encrypt(
|
return crypto.subtle.encrypt(
|
||||||
{
|
{
|
||||||
name: 'AES-GCM',
|
name: 'AES-GCM',
|
||||||
iv
|
iv
|
||||||
},
|
},
|
||||||
key,
|
key,
|
||||||
dataBuffer
|
data
|
||||||
).then(cipher => {
|
).then(cipher => {
|
||||||
const encryptedDataEncoder = encoding.createEncoder()
|
const encryptedDataEncoder = encoding.createEncoder()
|
||||||
encoding.writeVarString(encryptedDataEncoder, 'AES-GCM')
|
encoding.writeVarString(encryptedDataEncoder, 'AES-GCM')
|
||||||
encoding.writeVarUint8Array(encryptedDataEncoder, iv)
|
encoding.writeVarUint8Array(encryptedDataEncoder, iv)
|
||||||
encoding.writeVarUint8Array(encryptedDataEncoder, new Uint8Array(cipher))
|
encoding.writeVarUint8Array(encryptedDataEncoder, new Uint8Array(cipher))
|
||||||
return buffer.toBase64(encoding.toUint8Array(encryptedDataEncoder))
|
return encoding.toUint8Array(encryptedDataEncoder)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} data
|
* @param {Object} data data to be encrypted
|
||||||
* @param {CryptoKey} key
|
* @param {CryptoKey?} key
|
||||||
* @return {PromiseLike<any>} decrypted object
|
* @return {PromiseLike<Uint8Array>} encrypted data, if key is provided
|
||||||
|
*/
|
||||||
|
export const encryptJson = (data, key) => {
|
||||||
|
const dataEncoder = encoding.createEncoder()
|
||||||
|
encoding.writeAny(dataEncoder, data)
|
||||||
|
return encrypt(encoding.toUint8Array(dataEncoder), key)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Uint8Array} data
|
||||||
|
* @param {CryptoKey?} key
|
||||||
|
* @return {PromiseLike<Uint8Array>} decrypted buffer
|
||||||
*/
|
*/
|
||||||
export const decrypt = (data, key) => {
|
export const decrypt = (data, key) => {
|
||||||
if (typeof data !== 'string') {
|
if (!key) {
|
||||||
return promise.reject()
|
return /** @type {PromiseLike<Uint8Array>} */ (promise.resolve(data))
|
||||||
}
|
}
|
||||||
const dataDecoder = decoding.createDecoder(buffer.fromBase64(data))
|
const dataDecoder = decoding.createDecoder(data)
|
||||||
const algorithm = decoding.readVarString(dataDecoder)
|
const algorithm = decoding.readVarString(dataDecoder)
|
||||||
if (algorithm !== 'AES-GCM') {
|
if (algorithm !== 'AES-GCM') {
|
||||||
promise.reject(error.create('Unknown encryption algorithm'))
|
promise.reject(error.create('Unknown encryption algorithm'))
|
||||||
|
@ -89,7 +99,15 @@ export const decrypt = (data, key) => {
|
||||||
},
|
},
|
||||||
key,
|
key,
|
||||||
cipher
|
cipher
|
||||||
).then(decryptedValue =>
|
).then(data => new Uint8Array(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Uint8Array} data
|
||||||
|
* @param {CryptoKey?} key
|
||||||
|
* @return {PromiseLike<Object>} decrypted object
|
||||||
|
*/
|
||||||
|
export const decryptJson = (data, key) =>
|
||||||
|
decrypt(data, key).then(decryptedValue =>
|
||||||
decoding.readAny(decoding.createDecoder(new Uint8Array(decryptedValue)))
|
decoding.readAny(decoding.createDecoder(new Uint8Array(decryptedValue)))
|
||||||
)
|
)
|
||||||
}
|
|
||||||
|
|
346
src/y-webrtc.js
346
src/y-webrtc.js
|
@ -7,6 +7,9 @@ import * as decoding from 'lib0/decoding.js'
|
||||||
import { Observable } from 'lib0/observable.js'
|
import { Observable } from 'lib0/observable.js'
|
||||||
import * as logging from 'lib0/logging.js'
|
import * as logging from 'lib0/logging.js'
|
||||||
import * as promise from 'lib0/promise.js'
|
import * as promise from 'lib0/promise.js'
|
||||||
|
import * as bc from 'lib0/broadcastchannel.js'
|
||||||
|
import * as buffer from 'lib0/buffer.js'
|
||||||
|
import { createMutex } from 'lib0/mutex.js'
|
||||||
|
|
||||||
import * as Y from 'yjs' // eslint-disable-line
|
import * as Y from 'yjs' // eslint-disable-line
|
||||||
import Peer from 'simple-peer/simplepeer.min.js'
|
import Peer from 'simple-peer/simplepeer.min.js'
|
||||||
|
@ -21,6 +24,7 @@ const log = logging.createModuleLogger('y-webrtc')
|
||||||
const messageSync = 0
|
const messageSync = 0
|
||||||
const messageQueryAwareness = 3
|
const messageQueryAwareness = 3
|
||||||
const messageAwareness = 1
|
const messageAwareness = 1
|
||||||
|
const messageBcPeerId = 4
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @type {Map<string, SignalingConn>}
|
* @type {Map<string, SignalingConn>}
|
||||||
|
@ -50,30 +54,27 @@ const checkIsSynced = room => {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {WebrtcConn} peerConn
|
* @param {Room} room
|
||||||
* @param {Uint8Array} buf
|
* @param {Uint8Array} buf
|
||||||
|
* @param {function} syncedCallback
|
||||||
* @return {encoding.Encoder?}
|
* @return {encoding.Encoder?}
|
||||||
*/
|
*/
|
||||||
const readPeerMessage = (peerConn, buf) => {
|
const readMessage = (room, buf, syncedCallback) => {
|
||||||
const decoder = decoding.createDecoder(buf)
|
const decoder = decoding.createDecoder(buf)
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
const messageType = decoding.readVarUint(decoder)
|
const messageType = decoding.readVarUint(decoder)
|
||||||
const room = peerConn.room
|
|
||||||
log('received message from ', logging.BOLD, peerConn.remotePeerId, logging.GREY, ' (' + room.name + ')', logging.UNBOLD, logging.UNCOLOR, ' message type: ', logging.BOLD, messageType)
|
|
||||||
if (room === undefined) {
|
if (room === undefined) {
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
const provider = room.provider
|
const awareness = room.awareness
|
||||||
const doc = room.doc
|
const doc = room.doc
|
||||||
let sendReply = false
|
let sendReply = false
|
||||||
switch (messageType) {
|
switch (messageType) {
|
||||||
case messageSync:
|
case messageSync:
|
||||||
encoding.writeVarUint(encoder, messageSync)
|
encoding.writeVarUint(encoder, messageSync)
|
||||||
const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, doc, room.provider)
|
const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, doc, room)
|
||||||
if (syncMessageType === syncProtocol.messageYjsSyncStep2 && !room.synced) {
|
if (syncMessageType === syncProtocol.messageYjsSyncStep2 && !room.synced) {
|
||||||
peerConn.synced = true
|
syncedCallback()
|
||||||
log('synced ', logging.BOLD, room.name, logging.UNBOLD, ' with ', logging.BOLD, peerConn.remotePeerId)
|
|
||||||
checkIsSynced(room)
|
|
||||||
}
|
}
|
||||||
if (syncMessageType === syncProtocol.messageYjsSyncStep1) {
|
if (syncMessageType === syncProtocol.messageYjsSyncStep1) {
|
||||||
sendReply = true
|
sendReply = true
|
||||||
|
@ -81,12 +82,35 @@ const readPeerMessage = (peerConn, buf) => {
|
||||||
break
|
break
|
||||||
case messageQueryAwareness:
|
case messageQueryAwareness:
|
||||||
encoding.writeVarUint(encoder, messageAwareness)
|
encoding.writeVarUint(encoder, messageAwareness)
|
||||||
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys())))
|
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, Array.from(awareness.getStates().keys())))
|
||||||
sendReply = true
|
sendReply = true
|
||||||
break
|
break
|
||||||
case messageAwareness:
|
case messageAwareness:
|
||||||
awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider)
|
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), room)
|
||||||
break
|
break
|
||||||
|
case messageBcPeerId: {
|
||||||
|
const add = decoding.readUint8(decoder) === 1
|
||||||
|
const peerName = decoding.readVarString(decoder)
|
||||||
|
if (peerName !== room.peerId && ((room.bcConns.has(peerName) && !add) || (!room.bcConns.has(peerName) && add))) {
|
||||||
|
const removed = []
|
||||||
|
const added = []
|
||||||
|
if (add) {
|
||||||
|
room.bcConns.add(peerName)
|
||||||
|
added.push(peerName)
|
||||||
|
} else {
|
||||||
|
room.bcConns.delete(peerName)
|
||||||
|
removed.push(peerName)
|
||||||
|
}
|
||||||
|
room.provider.emit('peers', [{
|
||||||
|
added,
|
||||||
|
removed,
|
||||||
|
webrtcPeers: Array.from(room.webrtcConns.keys()),
|
||||||
|
bcPeers: Array.from(room.bcConns)
|
||||||
|
}])
|
||||||
|
broadcastBcPeerId(room)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
console.error('Unable to compute message')
|
console.error('Unable to compute message')
|
||||||
return encoder
|
return encoder
|
||||||
|
@ -98,6 +122,21 @@ const readPeerMessage = (peerConn, buf) => {
|
||||||
return encoder
|
return encoder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {WebrtcConn} peerConn
|
||||||
|
* @param {Uint8Array} buf
|
||||||
|
* @return {encoding.Encoder?}
|
||||||
|
*/
|
||||||
|
const readPeerMessage = (peerConn, buf) => {
|
||||||
|
const room = peerConn.room
|
||||||
|
log('received message from ', logging.BOLD, peerConn.remotePeerId, logging.GREY, ' (', room.name, ')', logging.UNBOLD, logging.UNCOLOR)
|
||||||
|
return readMessage(room, buf, () => {
|
||||||
|
peerConn.synced = true
|
||||||
|
log('synced ', logging.BOLD, room.name, logging.UNBOLD, ' with ', logging.BOLD, peerConn.remotePeerId)
|
||||||
|
checkIsSynced(room)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {WebrtcConn} webrtcConn
|
* @param {WebrtcConn} webrtcConn
|
||||||
* @param {encoding.Encoder} encoder
|
* @param {encoding.Encoder} encoder
|
||||||
|
@ -111,11 +150,10 @@ const sendWebrtcConn = (webrtcConn, encoder) => {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {Room} room
|
* @param {Room} room
|
||||||
* @param {encoding.Encoder} encoder
|
* @param {Uint8Array} m
|
||||||
*/
|
*/
|
||||||
const broadcastWebrtcConn = (room, encoder) => {
|
const broadcastWebrtcConn = (room, m) => {
|
||||||
log('broadcast message in ', logging.BOLD, room.name, logging.UNBOLD)
|
log('broadcast message in ', logging.BOLD, room.name, logging.UNBOLD)
|
||||||
const m = encoding.toUint8Array(encoder)
|
|
||||||
room.webrtcConns.forEach(conn => {
|
room.webrtcConns.forEach(conn => {
|
||||||
try {
|
try {
|
||||||
conn.peer.send(m)
|
conn.peer.send(m)
|
||||||
|
@ -150,7 +188,7 @@ export class WebrtcConn {
|
||||||
// send sync step 1
|
// send sync step 1
|
||||||
const provider = room.provider
|
const provider = room.provider
|
||||||
const doc = provider.doc
|
const doc = provider.doc
|
||||||
const awareness = provider.awareness
|
const awareness = room.awareness
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
encoding.writeVarUint(encoder, messageSync)
|
encoding.writeVarUint(encoder, messageSync)
|
||||||
syncProtocol.writeSyncStep1(encoder, doc)
|
syncProtocol.writeSyncStep1(encoder, doc)
|
||||||
|
@ -166,12 +204,21 @@ export class WebrtcConn {
|
||||||
this.peer.on('close', () => {
|
this.peer.on('close', () => {
|
||||||
this.connected = false
|
this.connected = false
|
||||||
this.closed = true
|
this.closed = true
|
||||||
room.webrtcConns.delete(this.remotePeerId)
|
if (room.webrtcConns.has(this.remotePeerId)) {
|
||||||
|
room.webrtcConns.delete(this.remotePeerId)
|
||||||
|
room.provider.emit('peers', [{
|
||||||
|
removed: [this.remotePeerId],
|
||||||
|
added: [],
|
||||||
|
webrtcPeers: Array.from(room.webrtcConns.keys()),
|
||||||
|
bcPeers: Array.from(room.bcConns)
|
||||||
|
}])
|
||||||
|
}
|
||||||
checkIsSynced(room)
|
checkIsSynced(room)
|
||||||
this.peer.destroy()
|
this.peer.destroy()
|
||||||
log('closed connection to ', logging.BOLD, remotePeerId)
|
log('closed connection to ', logging.BOLD, remotePeerId)
|
||||||
})
|
})
|
||||||
this.peer.on('error', err => {
|
this.peer.on('error', err => {
|
||||||
|
announceSignalingInfo(room)
|
||||||
log('error in connection to ', logging.BOLD, remotePeerId, ': ', err)
|
log('error in connection to ', logging.BOLD, remotePeerId, ': ', err)
|
||||||
})
|
})
|
||||||
this.peer.on('data', data => {
|
this.peer.on('data', data => {
|
||||||
|
@ -181,6 +228,52 @@ export class WebrtcConn {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
destroy () {
|
||||||
|
this.peer.destroy()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Room} room
|
||||||
|
* @param {Uint8Array} m
|
||||||
|
*/
|
||||||
|
const broadcastBcMessage = (room, m) => cryptoutils.encrypt(m, room.key).then(data =>
|
||||||
|
room.mux(() =>
|
||||||
|
bc.publish(room.name, data)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Room} room
|
||||||
|
* @param {Uint8Array} m
|
||||||
|
*/
|
||||||
|
const broadcastRoomMessage = (room, m) => {
|
||||||
|
if (room.bcconnected) {
|
||||||
|
broadcastBcMessage(room, m)
|
||||||
|
}
|
||||||
|
broadcastWebrtcConn(room, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Room} room
|
||||||
|
*/
|
||||||
|
const announceSignalingInfo = room => {
|
||||||
|
signalingConns.forEach(conn => {
|
||||||
|
// only subcribe if connection is established, otherwise the conn automatically subscribes to all rooms
|
||||||
|
if (conn.connected) {
|
||||||
|
conn.send({ type: 'subscribe', topics: [room.name] })
|
||||||
|
publishSignalingMessage(conn, room, { type: 'announce', from: room.peerId })
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const broadcastBcPeerId = room => {
|
||||||
|
// broadcast peerId via broadcastchannel
|
||||||
|
const encoderPeerIdBc = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoderPeerIdBc, messageBcPeerId)
|
||||||
|
encoding.writeUint8(encoderPeerIdBc, 1)
|
||||||
|
encoding.writeVarString(encoderPeerIdBc, room.peerId)
|
||||||
|
broadcastBcMessage(room, encoding.toUint8Array(encoderPeerIdBc))
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Room {
|
export class Room {
|
||||||
|
@ -198,14 +291,124 @@ export class Room {
|
||||||
*/
|
*/
|
||||||
this.peerId = random.uuidv4()
|
this.peerId = random.uuidv4()
|
||||||
this.doc = doc
|
this.doc = doc
|
||||||
|
/**
|
||||||
|
* @type {awarenessProtocol.Awareness}
|
||||||
|
*/
|
||||||
|
this.awareness = provider.awareness
|
||||||
this.provider = provider
|
this.provider = provider
|
||||||
this.synced = false
|
this.synced = false
|
||||||
this.name = name
|
this.name = name
|
||||||
|
// @todo make key secret by scoping
|
||||||
this.key = key
|
this.key = key
|
||||||
/**
|
/**
|
||||||
* @type {Map<string, WebrtcConn>}
|
* @type {Map<string, WebrtcConn>}
|
||||||
*/
|
*/
|
||||||
this.webrtcConns = new Map()
|
this.webrtcConns = new Map()
|
||||||
|
/**
|
||||||
|
* @type {Set<string>}
|
||||||
|
*/
|
||||||
|
this.bcConns = new Set()
|
||||||
|
this.mux = createMutex()
|
||||||
|
this.bcconnected = false
|
||||||
|
/**
|
||||||
|
* @param {ArrayBuffer} data
|
||||||
|
*/
|
||||||
|
this._bcSubscriber = data =>
|
||||||
|
cryptoutils.decrypt(new Uint8Array(data), key).then(m =>
|
||||||
|
this.mux(() => {
|
||||||
|
const reply = readMessage(this, m, () => {})
|
||||||
|
if (reply) {
|
||||||
|
broadcastBcMessage(this, encoding.toUint8Array(reply))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
/**
|
||||||
|
* Listens to Yjs updates and sends them to remote peers
|
||||||
|
*
|
||||||
|
* @param {Uint8Array} update
|
||||||
|
* @param {any} origin
|
||||||
|
*/
|
||||||
|
this._docUpdateHandler = (update, origin) => {
|
||||||
|
if (origin !== this) {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, messageSync)
|
||||||
|
syncProtocol.writeUpdate(encoder, update)
|
||||||
|
broadcastRoomMessage(this, encoding.toUint8Array(encoder))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Listens to Awareness updates and sends them to remote peers
|
||||||
|
*
|
||||||
|
* @param {any} changed
|
||||||
|
* @param {any} origin
|
||||||
|
*/
|
||||||
|
this._awarenessUpdateHandler = ({ added, updated, removed }, origin) => {
|
||||||
|
const changedClients = added.concat(updated).concat(removed)
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, messageAwareness)
|
||||||
|
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients))
|
||||||
|
broadcastRoomMessage(this, encoding.toUint8Array(encoder))
|
||||||
|
}
|
||||||
|
this.doc.on('update', this._docUpdateHandler)
|
||||||
|
this.awareness.on('change', this._awarenessUpdateHandler)
|
||||||
|
window.addEventListener('beforeunload', () => {
|
||||||
|
awarenessProtocol.removeAwarenessStates(this.awareness, [doc.clientID], 'window unload')
|
||||||
|
rooms.forEach(room => {
|
||||||
|
room.disconnect()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
connect () {
|
||||||
|
// signal through all available signaling connections
|
||||||
|
announceSignalingInfo(this)
|
||||||
|
const roomName = this.name
|
||||||
|
bc.subscribe(roomName, this._bcSubscriber)
|
||||||
|
this.bcconnected = true
|
||||||
|
// broadcast peerId via broadcastchannel
|
||||||
|
broadcastBcPeerId(this)
|
||||||
|
// write sync step 1
|
||||||
|
const encoderSync = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoderSync, messageSync)
|
||||||
|
syncProtocol.writeSyncStep1(encoderSync, this.doc)
|
||||||
|
broadcastBcMessage(this, encoding.toUint8Array(encoderSync))
|
||||||
|
// broadcast local state
|
||||||
|
const encoderState = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoderState, messageSync)
|
||||||
|
syncProtocol.writeSyncStep2(encoderState, this.doc)
|
||||||
|
broadcastBcMessage(this, encoding.toUint8Array(encoderState))
|
||||||
|
// write queryAwareness
|
||||||
|
const encoderAwarenessQuery = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness)
|
||||||
|
broadcastBcMessage(this, encoding.toUint8Array(encoderAwarenessQuery))
|
||||||
|
// broadcast local awareness state
|
||||||
|
const encoderAwarenessState = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoderAwarenessState, messageAwareness)
|
||||||
|
encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID]))
|
||||||
|
broadcastBcMessage(this, encoding.toUint8Array(encoderAwarenessState))
|
||||||
|
}
|
||||||
|
disconnect () {
|
||||||
|
// signal through all available signaling connections
|
||||||
|
signalingConns.forEach(conn => {
|
||||||
|
if (conn.connected) {
|
||||||
|
conn.send({ type: 'unsubscribe', topics: [this.name] })
|
||||||
|
}
|
||||||
|
})
|
||||||
|
awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'disconnect')
|
||||||
|
// broadcast peerId removal via broadcastchannel
|
||||||
|
const encoderPeerIdBc = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoderPeerIdBc, messageBcPeerId)
|
||||||
|
encoding.writeUint8(encoderPeerIdBc, 0) // remove peerId from other bc peers
|
||||||
|
encoding.writeVarString(encoderPeerIdBc, this.peerId)
|
||||||
|
broadcastBcMessage(this, encoding.toUint8Array(encoderPeerIdBc))
|
||||||
|
|
||||||
|
bc.unsubscribe(this.name, this._bcSubscriber)
|
||||||
|
this.bcconnected = false
|
||||||
|
this.doc.off('update', this._docUpdateHandler)
|
||||||
|
this.awareness.off('change', this._awarenessUpdateHandler)
|
||||||
|
this.webrtcConns.forEach(conn => conn.destroy())
|
||||||
|
}
|
||||||
|
destroy () {
|
||||||
|
this.disconnect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,14 +426,6 @@ const openRoom = (doc, provider, name, key) => {
|
||||||
}
|
}
|
||||||
const room = new Room(doc, provider, name, key)
|
const room = new Room(doc, provider, name, key)
|
||||||
rooms.set(name, /** @type {Room} */ (room))
|
rooms.set(name, /** @type {Room} */ (room))
|
||||||
// signal through all available signaling connections
|
|
||||||
signalingConns.forEach(conn => {
|
|
||||||
// only subcribe if connection is established, otherwise the conn automatically subscribes to all rooms
|
|
||||||
if (conn.connected) {
|
|
||||||
conn.send({ type: 'subscribe', topics: [name] })
|
|
||||||
publishSignalingMessage(conn, room, { type: 'announce', from: room.peerId })
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return room
|
return room
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,8 +436,8 @@ const openRoom = (doc, provider, name, key) => {
|
||||||
*/
|
*/
|
||||||
const publishSignalingMessage = (conn, room, data) => {
|
const publishSignalingMessage = (conn, room, data) => {
|
||||||
if (room.key) {
|
if (room.key) {
|
||||||
cryptoutils.encrypt(data, room.key).then(data => {
|
cryptoutils.encryptJson(data, room.key).then(data => {
|
||||||
conn.send({ type: 'publish', topic: room.name, data })
|
conn.send({ type: 'publish', topic: room.name, data: buffer.toBase64(data) })
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
conn.send({ type: 'publish', topic: room.name, data })
|
conn.send({ type: 'publish', topic: room.name, data })
|
||||||
|
@ -257,6 +452,7 @@ export class SignalingConn extends ws.WebsocketClient {
|
||||||
*/
|
*/
|
||||||
this.providers = new Set()
|
this.providers = new Set()
|
||||||
this.on('connect', () => {
|
this.on('connect', () => {
|
||||||
|
log(`connected (${url})`)
|
||||||
const topics = Array.from(rooms.keys())
|
const topics = Array.from(rooms.keys())
|
||||||
this.send({ type: 'subscribe', topics })
|
this.send({ type: 'subscribe', topics })
|
||||||
rooms.forEach(room =>
|
rooms.forEach(room =>
|
||||||
|
@ -274,29 +470,40 @@ export class SignalingConn extends ws.WebsocketClient {
|
||||||
const execMessage = data => {
|
const execMessage = data => {
|
||||||
const webrtcConns = room.webrtcConns
|
const webrtcConns = room.webrtcConns
|
||||||
const peerId = room.peerId
|
const peerId = room.peerId
|
||||||
if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId)) {
|
if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) {
|
||||||
|
// ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
const emitPeerChange = webrtcConns.has(data.from) ? () => {} : () =>
|
||||||
|
room.provider.emit('peers', [{
|
||||||
|
removed: [],
|
||||||
|
added: [data.from],
|
||||||
|
webrtcPeers: Array.from(room.webrtcConns.keys()),
|
||||||
|
bcPeers: Array.from(room.bcConns)
|
||||||
|
}])
|
||||||
switch (data.type) {
|
switch (data.type) {
|
||||||
case 'announce':
|
case 'announce':
|
||||||
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room))
|
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room))
|
||||||
|
emitPeerChange()
|
||||||
break
|
break
|
||||||
case 'signal':
|
case 'signal':
|
||||||
if (data.to === peerId) {
|
if (data.to === peerId) {
|
||||||
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal)
|
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal)
|
||||||
|
emitPeerChange()
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (room.key) {
|
if (room.key) {
|
||||||
cryptoutils.decrypt(m.data, room.key).then(execMessage)
|
if (typeof m.data === 'string') {
|
||||||
|
cryptoutils.decryptJson(buffer.fromBase64(m.data), room.key).then(execMessage)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
execMessage(m.data)
|
execMessage(m.data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
this.on('connect', () => log(`connected (${url})`))
|
|
||||||
this.on('disconnect', () => log(`disconnect (${url})`))
|
this.on('disconnect', () => log(`disconnect (${url})`))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -316,79 +523,64 @@ export class WebrtcProvider extends Observable {
|
||||||
super()
|
super()
|
||||||
this.roomName = roomName
|
this.roomName = roomName
|
||||||
this.doc = doc
|
this.doc = doc
|
||||||
|
this.awareness = new awarenessProtocol.Awareness(doc)
|
||||||
|
this.shouldConnect = false
|
||||||
|
this.signalingUrls = signaling
|
||||||
this.signalingConns = []
|
this.signalingConns = []
|
||||||
/**
|
/**
|
||||||
* @type {PromiseLike<CryptoKey | null>}
|
* @type {PromiseLike<CryptoKey | null>}
|
||||||
*/
|
*/
|
||||||
this.key = password ? cryptoutils.deriveKey(password, roomName) : /** @type {PromiseLike<null>} */ (promise.resolve(null))
|
this.key = password ? cryptoutils.deriveKey(password, roomName) : /** @type {PromiseLike<null>} */ (promise.resolve(null))
|
||||||
signaling.forEach(url => {
|
|
||||||
const signalingConn = map.setIfUndefined(signalingConns, url, () => new SignalingConn(url))
|
|
||||||
this.signalingConns.push(signalingConn)
|
|
||||||
signalingConn.providers.add(this)
|
|
||||||
})
|
|
||||||
/**
|
/**
|
||||||
* @type {Room|null}
|
* @type {Room|null}
|
||||||
*/
|
*/
|
||||||
this.room = null
|
this.room = null
|
||||||
this.key.then(key => {
|
this.key.then(key => {
|
||||||
this.room = openRoom(doc, this, roomName, key)
|
this.room = openRoom(doc, this, roomName, key)
|
||||||
})
|
if (this.shouldConnect) {
|
||||||
/**
|
this.room.connect()
|
||||||
* @type {awarenessProtocol.Awareness}
|
} else {
|
||||||
*/
|
this.room.disconnect()
|
||||||
this.awareness = new awarenessProtocol.Awareness(doc)
|
|
||||||
/**
|
|
||||||
* Listens to Yjs updates and sends them to remote peers
|
|
||||||
*
|
|
||||||
* @param {Uint8Array} update
|
|
||||||
* @param {any} origin
|
|
||||||
*/
|
|
||||||
this._docUpdateHandler = (update, origin) => {
|
|
||||||
if (this.room !== null && (origin !== this || origin === null)) {
|
|
||||||
const encoder = encoding.createEncoder()
|
|
||||||
encoding.writeVarUint(encoder, messageSync)
|
|
||||||
syncProtocol.writeUpdate(encoder, update)
|
|
||||||
broadcastWebrtcConn(this.room, encoder)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Listens to Awareness updates and sends them to remote peers
|
|
||||||
*
|
|
||||||
* @param {any} changed
|
|
||||||
* @param {any} origin
|
|
||||||
*/
|
|
||||||
this._awarenessUpdateHandler = ({ added, updated, removed }, origin) => {
|
|
||||||
if (this.room !== null) {
|
|
||||||
const changedClients = added.concat(updated).concat(removed)
|
|
||||||
const encoder = encoding.createEncoder()
|
|
||||||
encoding.writeVarUint(encoder, messageAwareness)
|
|
||||||
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients))
|
|
||||||
broadcastWebrtcConn(this.room, encoder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.doc.on('update', this._docUpdateHandler)
|
|
||||||
this.awareness.on('change', this._awarenessUpdateHandler)
|
|
||||||
window.addEventListener('beforeunload', () => {
|
|
||||||
awarenessProtocol.removeAwarenessStates(this.awareness, [doc.clientID], 'window unload')
|
|
||||||
})
|
})
|
||||||
|
this.connect()
|
||||||
}
|
}
|
||||||
destroy () {
|
/**
|
||||||
super.destroy()
|
* @type {boolean}
|
||||||
|
*/
|
||||||
|
get connected () {
|
||||||
|
return this.room !== null && this.shouldConnect
|
||||||
|
}
|
||||||
|
connect () {
|
||||||
|
this.shouldConnect = true
|
||||||
|
this.signalingUrls.forEach(url => {
|
||||||
|
const signalingConn = map.setIfUndefined(signalingConns, url, () => new SignalingConn(url))
|
||||||
|
this.signalingConns.push(signalingConn)
|
||||||
|
signalingConn.providers.add(this)
|
||||||
|
})
|
||||||
|
if (this.room) {
|
||||||
|
this.room.connect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
disconnect () {
|
||||||
|
this.shouldConnect = false
|
||||||
this.signalingConns.forEach(conn => {
|
this.signalingConns.forEach(conn => {
|
||||||
conn.providers.delete(this)
|
conn.providers.delete(this)
|
||||||
if (conn.providers.size === 0) {
|
if (conn.providers.size === 0) {
|
||||||
conn.destroy()
|
conn.destroy()
|
||||||
signalingConns.delete(this.roomName)
|
signalingConns.delete(this.roomName)
|
||||||
} else {
|
|
||||||
conn.send({ type: 'unsubscribe', topics: [this.roomName] })
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
if (this.room) {
|
||||||
|
this.room.disconnect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
destroy () {
|
||||||
// need to wait for key before deleting room
|
// need to wait for key before deleting room
|
||||||
this.key.then(() => {
|
this.key.then(() => {
|
||||||
|
/** @type {Room} */ (this.room).destroy()
|
||||||
rooms.delete(this.roomName)
|
rooms.delete(this.roomName)
|
||||||
})
|
})
|
||||||
this.doc.off('update', this._docUpdateHandler)
|
|
||||||
this.awareness.off('change', this._awarenessUpdateHandler)
|
|
||||||
super.destroy()
|
super.destroy()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,8 @@
|
||||||
"baseUrl": "./", /* Base directory to resolve non-absolute module names. */
|
"baseUrl": "./", /* Base directory to resolve non-absolute module names. */
|
||||||
"paths": {
|
"paths": {
|
||||||
"yjs": ["../yjs/src/index.js"],
|
"yjs": ["../yjs/src/index.js"],
|
||||||
"lib0/*": ["../lib0/*"]
|
"lib0/*": ["../lib0/*"],
|
||||||
|
"y-protocols/*": ["../y-protocols/*"]
|
||||||
},
|
},
|
||||||
// "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */
|
// "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */
|
||||||
// "typeRoots": [], /* List of folders to include type definitions from. */
|
// "typeRoots": [], /* List of folders to include type definitions from. */
|
||||||
|
@ -57,6 +58,8 @@
|
||||||
/* Experimental Options */
|
/* Experimental Options */
|
||||||
// "experimentalDecorators": true, /* Enables experimental support for ES7 decorators. */
|
// "experimentalDecorators": true, /* Enables experimental support for ES7 decorators. */
|
||||||
// "emitDecoratorMetadata": true, /* Enables experimental support for emitting type metadata for decorators. */
|
// "emitDecoratorMetadata": true, /* Enables experimental support for emitting type metadata for decorators. */
|
||||||
"maxNodeModuleJsDepth": 5
|
// "maxNodeModuleJsDepth": 5
|
||||||
}
|
},
|
||||||
|
"exclude": ["../lib0/**/*", "node_modules", "node_modules/**/*", "dist", "dist/**/*.js"],
|
||||||
|
"include": ["./src/**/*", "./tests/**/*"]
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue