Bladeren bron

add: 新增 message

liutian 11 maanden geleden
bovenliggende
commit
403a2ffac5
3 gewijzigde bestanden met toevoegingen van 262 en 22 verwijderingen
  1. 0 1
      packages/core/src/index.js
  2. 253 10
      packages/core/src/shared/MessageHandler.ts
  3. 9 11
      packages/core/src/worker/index.ts

+ 0 - 1
packages/core/src/index.js

@@ -2480,7 +2480,6 @@ class ComPDFKitViewer {
         const url = await this.download(false, null, 3)
         this.triggerPrinting(url);
         return url
-        break;
 
       case "SaveAs":
         this.downloadOrSave();

+ 253 - 10
packages/core/src/shared/MessageHandler.ts

@@ -23,6 +23,10 @@ enum CallbackKind {
   ERROR,
 }
 
+interface PromiseWithOptionalResolvers<T> extends PromiseWithResolvers<T> {
+  resolve: (value?: T | PromiseLike<T>) => void;
+}
+
 interface errorMsg extends Error {
   message: string
   name: string
@@ -40,6 +44,39 @@ interface streamMessage {
   desiredSize: number
   chunk: any
 }
+type MessageData = {
+  sourceName: string;
+  targetName: string;
+  action?: string;
+  callbackId?: number;
+  data?: any;
+  callback?: CallbackKind;
+  reason?: any;
+  stream?: StreamKind;
+  streamId?: number;
+  desiredSize?: number;
+  chunk?: any;
+}
+
+type StreamSink = {
+  enqueue(chunk: any, size?: number, transfers?: StructuredSerializeOptions): void;
+  close(): void;
+  error(reason: any): void;
+  sinkCapability: PromiseCapability<void>;
+  onPull: (() => void) | null;
+  onCancel: ((reason: any) => void) | null;
+  isCancelled: boolean;
+  desiredSize: number;
+  ready: Promise<void> | null;
+}
+
+type StreamController = {
+  controller: ReadableStreamDefaultController<any>;
+  startCall: PromiseWithOptionalResolvers<unknown>;
+  pullCall: PromiseWithOptionalResolvers<unknown> | null;
+  cancelCall: PromiseWithOptionalResolvers<unknown> | null;
+  isClosed: boolean;
+}
 
 type PromiseCapability<T> = {
   promise: Promise<T>;
@@ -74,11 +111,11 @@ function wrapReason(reason: errorMsg) {
 class MessageHandler {
   sourceName: string
   targetName: string
-  comObj: any
+  comObj: MessagePort
   callbackId: number
   streamId: number
-  streamSink: null
-  streamControllers: null
+  streamSinks: { [key: number]: StreamSink }
+  streamControllers: { [key: number]: StreamController }
   callbackCapabilities: { [key: number]: PromiseCapability<any> }
   actionHandler: { [key: string]: Function }
   _onComObjOnMessage: (event: MessageEvent) => void
@@ -89,7 +126,7 @@ class MessageHandler {
     this.comObj = comObj
     this.callbackId = 1
     this.streamId = 1
-    this.streamSink = Object.create(null)
+    this.streamSinks = Object.create(null)
     this.streamControllers = Object.create(null)
     this.callbackCapabilities = Object.create(null)
     this.actionHandler = Object.create(null)
@@ -180,7 +217,7 @@ class MessageHandler {
     actionHandler[actionName] = handler
   }
 
-  send(actionName: string, data: any, transfers?: any[]) {
+  send(actionName: string, data: any, transfers?: StructuredSerializeOptions) {
     this.comObj.postMessage({
       sourceName: this.sourceName,
       targetName: this.targetName,
@@ -189,13 +226,204 @@ class MessageHandler {
     }, transfers)
   }
 
+  async sendWithPromise(actionName: string, data: any, transfers?: StructuredSerializeOptions) {
+    const callbackId = this.callbackId++
+    const capability = Promise.withResolvers()
+    this.callbackCapabilities[callbackId] = capability
+
+    try {
+      this.comObj.postMessage({
+        sourceName: this.sourceName,
+        tartgetName: this.targetName,
+        action: actionName,
+        callbackId,
+        data,
+      },
+      transfers)
+    } catch (ex) {
+      capability.reject(ex)
+    }
+
+    return capability.promise
+  }
+
+  sendWithStream(actionName: string, data: any, queuingStrategy: QueuingStrategy, transfers?: StructuredSerializeOptions) {
+    const streamId = this.streamId++,
+      sourceName = this.sourceName,
+      targetName = this.targetName,
+      comObj = this.comObj
+
+    return new ReadableStream(
+      {
+        start: controller => {
+          const startCabability = Promise.withResolvers()
+
+          this.streamControllers[streamId] = {
+            controller,
+            startCall: startCabability,
+            pullCall: null,
+            cancelCall: null,
+            isClosed: false,
+          }
+
+          comObj.postMessage({
+            sourceName,
+            targetName,
+            action: actionName,
+            streamId,
+            data,
+            deserialize: controller.desiredSize,
+          }, transfers)
+
+          return startCabability.promise
+        },
+
+        pull: controller => {
+          const pullCabability = Promise.withResolvers()
+
+          this.streamControllers[streamId].pullCall = pullCabability
+
+          comObj.postMessage({
+            sourceName,
+            targetName,
+            stream: StreamKind.PULL,
+            streamId,
+            desiredSize: controller.desiredSize,
+          })
+
+          return pullCabability.promise as PromiseLike<void>
+        },
+
+        cancel: reason => {
+          if (!(reason instanceof Error)) {
+            throw new Error('cancel must have a valid reason');
+          }
+
+          const cancelCabability = Promise.withResolvers()
+          this.streamControllers[streamId].cancelCall = cancelCabability
+          this.streamControllers[streamId].isClosed = true
+
+          comObj.postMessage({
+            sourceName,
+            targetName,
+            stream: StreamKind.CANCEL,
+            streamId,
+            reason: wrapReason(reason),
+          })
+
+          return cancelCabability.promise as PromiseLike<void>
+        }
+      },
+      queuingStrategy
+    )
+  }
+
+  #createStreamSink(data: MessageData) {
+    const streamId = data.streamId,
+      sourceName = this.sourceName,
+      targetName = this.targetName,
+      comObj = this.comObj,
+      self = this,
+      action = this.actionHandler[data.action!]
+
+    const streamSink: StreamSink = {
+      enqueue(chunk, size = 1, transfers?: StructuredSerializeOptions) {
+        if (this.isCancelled) {
+          return
+        }
+
+        const lastDesiredSize = this.desiredSize
+        this.desiredSize -= size
+
+        if (lastDesiredSize > 0 && this.desiredSize <= 0) {
+          this.sinkCapability = Promise.withResolvers()
+          this.ready = this.sinkCapability.promise
+        }
+
+        comObj.postMessage({
+          sourceName,
+          targetName,
+          stream: StreamKind.ENQUEUE,
+          streamId,
+          chunk,
+        }, transfers)
+      },
+
+      close() {
+        if (this.isCancelled) {
+          return
+        }
+
+        this.isCancelled = true
+
+        comObj.postMessage({
+          sourceName,
+          targetName,
+          stream: StreamKind.CLOSE,
+          streamId,
+        })
+        streamId && delete self.streamSinks[streamId]
+      },
+
+      error(reason) {
+        if (!(reason instanceof Error)) {
+          throw new Error('error must have a valid reason')
+        }
+
+        this.isCancelled = true
+
+        comObj.postMessage({
+          sourceName,
+          targetName,
+          stream: StreamKind.ERROR,
+          streamId,
+          reason: wrapReason(reason),
+        })
+      },
+
+      sinkCapability: Promise.withResolvers(),
+      onPull: null,
+      onCancel: null,
+      isCancelled: false,
+      desiredSize: data.desiredSize??0,
+      ready: null,
+    }
+
+    streamSink.sinkCapability.resolve()
+    streamSink.ready = streamSink.sinkCapability.promise
+    streamId && (this.streamSinks[streamId] = streamSink)
+
+    new Promise(resolve => {
+      resolve(action(data.data, streamSink))
+    }).then(
+      () => {
+        comObj.postMessage({
+          sourceName,
+          targetName,
+          stream: StreamKind.START_COMPLETE,
+          streamId,
+          success: true,
+        })
+      },
+      reason => {
+        comObj.postMessage({
+          sourceName,
+          targetName,
+          stream: StreamKind.START_COMPLETE,
+          streamId,
+          reason: wrapReason(reason),
+        })
+      }
+    )
+  }
+
   #processStreamMessage(data: streamMessage) {
     const streamId = data.streamId,
       sourceName = this.sourceName,
       targetName = this.sourceName,
       comObj = this.comObj,
       streamController = this.streamControllers![streamId],
-      streamSink = this.streamSink![streamId]
+      streamSink = this.streamSinks![streamId]
 
     switch (data.stream) {
       case StreamKind.START_COMPLETE:
@@ -207,9 +435,9 @@ class MessageHandler {
         break
       case StreamKind.PULL_COMPLETE:
         if (data.success) {
-          streamController.pullCall.resolve()
+          streamController.pullCall!.resolve()
         } else {
-          streamController.pullCall.reject(wrapReason(data.reason))
+          streamController.pullCall!.reject(wrapReason(data.reason))
         }
         break
       case StreamKind.PULL:
@@ -225,7 +453,7 @@ class MessageHandler {
         }
 
         if (streamSink.desiredSize <= 0 && data.desiredSize > 0) {
-          streamId.sinkCapability.resolve()
+          streamSink.sinkCapability.resolve()
         }
 
         streamSink.desiredSize = data.desiredSize
@@ -312,10 +540,25 @@ class MessageHandler {
         )
         streamSink.sinkCapability.reject(wrapReason(data.reason))
         streamSink.isCancelled = true
-        delete this.streamSink[streamId]
+        delete this.streamSinks[streamId]
         break
       default:
         throw new Error(`Unexpected stream case`)
     }
   }
+
+  async #deleteStreamController(streamController: StreamController, streamId: number) {
+    await Promise.allSettled([
+      streamController.startCall?.promise,
+      streamController.pullCall?.promise,
+      streamController.cancelCall?.promise,
+    ]);
+    delete this.streamControllers[streamId]
+  }
+
+  destroy() {
+    this.comObj.removeEventListener("message", this._onComObjOnMessage);
+  }
 }
+
+export default MessageHandler

+ 9 - 11
packages/core/src/worker/index.ts

@@ -1,9 +1,10 @@
+import MessageHandler from "../shared/MessageHandler"
 import { isNodeJS } from "../shared/util"
 
 class WorkerMessageHandler {
-  static setup(handler, port) {
+  static setup(handler: MessageHandler, port: MessagePort) {
     let testMessageProcessed = false;
-    handler.on("test", function (data) {
+    handler.on("test", function (data: ArrayBufferLike) {
       if (testMessageProcessed) {
         return; // we already processed 'test' message once
       }
@@ -11,34 +12,31 @@ class WorkerMessageHandler {
 
       // Ensure that `TypedArray`s can be sent to the worker.
       handler.send("test", data instanceof Uint8Array);
-    });
-
-    handler.on("configure", function (data) {
-      setVerbosityLevel(data.verbosity);
-    });
+    })
 
     handler.on("GetDocRequest", function (data) {
       return WorkerMessageHandler.createDocumentHandler(data, port);
     });
   }
 
-  static initializeFromPort(port) {
+  static initializeFromPort(port: MessagePort) {
     const handler = new MessageHandler("worker", "main", port);
     WorkerMessageHandler.setup(handler, port);
     handler.send("ready", null);
   }
 }
 
-function isMessagePort(maybePort: Window & typeof globalThis): boolean {
+function isMessagePort(maybePort: MessagePort): boolean {
   return(typeof maybePort.postMessage === 'function' && 'onmessage' in maybePort)
 }
 
 // Worker thread (and not Node.js)
+// @ts-ignore
 if (typeof window === 'undefined' &&
   !isNodeJS &&
   typeof self !== 'undefined' &&
-  isMessagePort(self)) {
-    WorkerMessageHandler.initializeFromPort(self)
+  isMessagePort(self as unknown as MessagePort)) {
+    WorkerMessageHandler.initializeFromPort(self as unknown as MessagePort)
 }
 
 export {