Replies: 6 comments 4 replies
-
I'd love this, but I don't have the bandwidth to make it happen. Keen on community contributions |
Beta Was this translation helpful? Give feedback.
-
I would propose writing a more general intermediate adapter for creating further subscription based adapters. It would need to support websocket similar methods like onconnect, onmessage, onerror and onclose, as well as callbacks for storing/mutating subscriptions (e.g. DO can't persist the Unsubscribeable interface easily so you would have to implement some custom logic for storing subscriptions) and a callback for sending a message to the client (since this differs between e.g. This should make it easier to also support other transports later e.g. WebTransport? |
Beta Was this translation helpful? Give feedback.
-
Currently i see the main problem with the subscription model, since it is not compatible with serverless. The Observeables are only loaded when the We would have to create a something like a |
Beta Was this translation helpful? Give feedback.
-
I have created a PoC tho it still uses the non persisteable non observeable reloading method of storing subscriptions (but i already added some api interface for that). trpc-ws-utils.ts imports ...
export type CreateContextFnOptions<TRes> = Omit<
NodeHTTPCreateContextFnOptions<IncomingMessage, TRes>,
'info'
>;
export type CreateContextFn<TRouter extends AnyRouter> = (
opts: CreateContextFnOptions<any>
) => MaybePromise<inferRouterContext<TRouter>>;
export type TransportConnection = {
send: (data: string) => MaybePromise<void>,
close: () => MaybePromise<void>,
isOpen: () => boolean,
subs: Subscriptions
}
export type Subscriptions = {
get: () => Map<number | string, Subscription>,
has: (id: number | string) => boolean
add: (sub: Subscription) => void
clear: () => void
}
export type Subscription = {
id: number | string,
sub?: Unsubscribable,
subInfo?: SubscriptionInfo
}
export type SubscriptionInfo = {
path: string,
type: string,
}
export type TrpcSubscriptionMap = Map<string, Map<number | string, Subscription>>;
export const getTrpcWsUtils = async <TRouter extends AnyRouter, TRequest extends IncomingMessage>(
opts:
BaseHandlerOptions<TRouter, IncomingMessage> &
CreateContextCallback<
inferRouterContext<TRouter>,
CreateContextFn<TRouter>
> &
{
req: TRequest,
currentTransport: TransportConnection,
getAllConnectedTransports: () => MaybePromise<TransportConnection[]>
}
) => {
const { createContext, router, req, currentTransport, getAllConnectedTransports } = opts;
const { transformer } = router._def._config;
const ctxPromise = createContext?.({ req, res: null });
let ctx: inferRouterContext<TRouter> | undefined = undefined;
function respond(untransformedJSON: TRPCResponseMessage) {
currentTransport.send(
JSON.stringify(
transformTRPCResponse(router._def._config, untransformedJSON)
)
);
}
async function createContextAsync() {
try {
ctx = await ctxPromise;
} catch (cause) {
const error = getTRPCErrorFromUnknown(cause);
opts.onError?.({
error,
path: undefined,
type: 'unknown',
ctx,
req,
input: undefined
});
//TODO: implement respond with client
respond({
id: null,
error: getErrorShape({
config: router._def._config,
error,
type: 'unknown',
path: undefined,
input: undefined,
ctx
})
});
// close in next tick
(global.setImmediate ?? global.setTimeout)(() => {
close();
});
}
}
function stopSubscription(
subscription: Unsubscribable,
{ id, jsonrpc }: JSONRPC2.BaseEnvelope & { id: JSONRPC2.RequestId }
) {
subscription.unsubscribe();
respond({
id,
jsonrpc,
result: {
type: 'stopped'
}
});
}
async function handleRequest(msg: TRPCClientOutgoingMessage) {
const { id, jsonrpc } = msg;
/* istanbul ignore next -- @preserve */
console.log({ msg });
if (id === null) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: '`id` is required'
});
}
if (msg.method === 'subscription.stop') {
// TODO
/*const sub = clientSubscriptions.get(id);
if (sub) {
stopSubscription(sub, { id, jsonrpc });
}
clientSubscriptions.delete(id);*/
return;
}
const { path, input } = msg.params;
const type = msg.method;
try {
await ctxPromise; // asserts context has been set
const result = await callProcedure({
procedures: router._def.procedures,
path,
getRawInput: async () => input,
ctx,
type
});
// check subscription to be observeable
if (type === 'subscription') {
if (!isObservable(result)) {
throw new TRPCError({
message: `Subscription ${path} did not return an observable`,
code: 'INTERNAL_SERVER_ERROR'
});
}
} else {
// send the value as data if the method is not a subscription
respond({
id,
jsonrpc,
result: {
type: 'data',
data: result
}
});
return;
}
const observable: Observable<any, any> = result;
const sub = observable.subscribe({
next(data) {
respond({
id,
jsonrpc,
result: {
type: 'data',
data
}
});
},
error(err) {
const error = getTRPCErrorFromUnknown(err);
opts.onError?.({ error, path, type, ctx, req, input });
respond({
id,
jsonrpc,
error: getErrorShape({
config: router._def._config,
error,
type,
path,
input,
ctx
})
});
},
complete() {
respond({
id,
jsonrpc,
result: {
type: 'stopped'
}
});
}
});
/* istanbul ignore next -- @preserve */
if (!currentTransport.isOpen()) {
// if the client got disconnected whilst initializing the subscription
// no need to send stopped message if the client is disconnected
sub.unsubscribe();
return;
}
/* istanbul ignore next -- @preserve */
if (currentTransport.subs.has(id)) {
// duplicate request ids for client
stopSubscription(sub, { id, jsonrpc });
throw new TRPCError({
message: `Duplicate id ${id}`,
code: 'BAD_REQUEST'
});
}
currentTransport.subs.add({ id, sub, subInfo: { path, type } });
respond({
id,
jsonrpc,
result: {
type: 'started'
}
});
} catch (cause) /* istanbul ignore next -- @preserve */ {
// procedure threw an error
const error = getTRPCErrorFromUnknown(cause);
opts.onError?.({ error, path, type, ctx, req, input });
respond({
id,
jsonrpc,
error: getErrorShape({
config: router._def._config,
error,
type,
path,
input,
ctx
})
});
}
}
await createContextAsync();
return {
handleMessage: async (message: string) => {
try {
// eslint-disable-next-line @typescript-eslint/no-base-to-string
const msgJSON: unknown = JSON.parse(message.toString());
const msgs: unknown[] = Array.isArray(msgJSON) ? msgJSON : [msgJSON];
const promises = msgs
.map((raw) => parseTRPCMessage(raw, transformer))
.map(handleRequest);
await Promise.all(promises);
} catch (cause) {
const error = new TRPCError({
code: 'PARSE_ERROR',
cause
});
respond({
id: null,
error: getErrorShape({
config: router._def._config,
error,
type: 'unknown',
path: undefined,
input: undefined,
ctx: undefined
})
});
}
},
handleError: (cause: any) => {
opts.onError?.({
ctx,
error: getTRPCErrorFromUnknown(cause),
input: undefined,
path: undefined,
type: 'unknown',
req
});
},
handleClose: () => {
for (const sub of currentTransport.subs.get().values()) {
sub.sub?.unsubscribe();
}
currentTransport.subs.clear();
},
broadcastReconnectNotification: async () => {
const response: TRPCReconnectNotification = {
id: null,
method: 'reconnect'
};
const data = JSON.stringify(response);
for (const connection of await getAllConnectedTransports()) {
if (connection.isOpen()) {
connection.send(data);
}
}
}
};
};
export function cloudflareTrpcUtils<TRouter extends AnyRouter>(router: TRouter, ws: WebSocket, ctx: DurableObjectState, subscriptionsMap: TrpcSubscriptionMap) {
console.log("Loading cloudflareTrpcUtils with:", {subscriptionsMap});
function getWsTag(ws: WebSocket) {
const tag = ctx.getTags(ws).find((tag) => tag.startsWith('ws-trpc-sub-id-'));
if (!tag) {
throw new Error('No subscription id tag found for websocket');
}
return tag;
}
function getSubscriptionMap(ws: WebSocket) {
const tag = getWsTag(ws);
if (!subscriptionsMap.has(tag)) {
subscriptionsMap.set(tag, new Map<number | string, Subscription>());
}
return subscriptionsMap.get(tag)!;
}
return getTrpcWsUtils<TRouter, null>({
createContext: async () => ({ req: null, res: null, ctx }),
router,
req: null,
currentTransport: {
send: (data) => ws.send(data),
close: () => ws.close(),
isOpen: () => ws.readyState === ws.OPEN,
subs: {
get: () => getSubscriptionMap(ws),
has: (id) => getSubscriptionMap(ws).has(id),
add: (sub) => getSubscriptionMap(ws).set(sub.id, sub),
clear: () => subscriptionsMap.delete(getWsTag(ws))
}
},
getAllConnectedTransports: () => ctx.getWebSockets().map((ws) => ({
send: (data) => ws.send(data),
close: () => ws.close(),
isOpen: () => ws.readyState === ws.OPEN,
subs: {
get: () => getSubscriptionMap(ws),
has: (id) => getSubscriptionMap(ws).has(id),
add: (sub) => getSubscriptionMap(ws).set(sub.id, sub),
clear: () => subscriptionsMap.delete(getWsTag(ws))
}
}))
});
} durable-object.ts imports ...
import { cloudflareTrpcUtils, TrpcSubscriptionMap } from './trpc-ws-utils.ts';
const trpc = initTRPC.context<{ ctx: DurableObjectState }>().create();
const router = trpc.router;
const procedure = trpc.procedure;
const listeners = {
numberChange: new Set<(newValue: number) => void>()
};
const doRouter = router({
onChange: procedure.subscription(() => {
return observable<number>((emit) => {
const sendChange = (data: number) => {
console.log('Sending change', data);
emit.next(data);
};
listeners.numberChange.add(sendChange);
return () => {
listeners.numberChange.delete(sendChange);
};
});
}),
inc: procedure.mutation(async (opts) => {
const ctx = opts.ctx.ctx;
const value = await ctx.storage.get('counter');
const newValue = (value as number || 0) + 1;
await ctx.storage.put('counter', newValue);
listeners.numberChange.forEach((listener) => listener(newValue));
})
});
export type DoRouter = typeof doRouter;
const subscriptionMap: TrpcSubscriptionMap = new Map();
export class TestDurableObject extends DurableObject {
constructor(state: DurableObjectState, env: Env) {
super(state, env);
}
async fetch(_request: Request): Promise<Response> {
// Creates two ends of a WebSocket connection.
const [client, server] = Object.values(new WebSocketPair());
// Stores WS connection in hibernation api.
this.ctx.acceptWebSocket(server, ['ws-trpc-sub-id-'+ Math.random().toString()]);
// Returns the client end of the WebSocket connection to the client.
return new Response(null, {
status: 101,
webSocket: client
});
}
async webSocketMessage(ws: WebSocket, message: ArrayBuffer | string) {
console.log(`Received message: ${message}`);
await (await cloudflareTrpcUtils(doRouter, ws, this.ctx, subscriptionMap)).handleMessage(message.toString());
}
async webSocketError(ws: WebSocket, error: any) {
console.log('WebSocket error:', error);
(await cloudflareTrpcUtils(doRouter, ws, this.ctx, subscriptionMap)).handleError(error);
}
async webSocketClose(ws: WebSocket, _code: number, _reason: string, _wasClean: boolean) {
console.log('WebSocket closed');
(await cloudflareTrpcUtils(doRouter, ws, this.ctx, subscriptionMap)).handleClose();
}
} This actually works tho it shouldn't survive hibernation because the subscription map will probably be "wiped". Also this is pretty hacked together and we would have to improve the api surface by a lot if this were to replace the current ws adapter. A pro for writing a more general adapter in this style for subscription backends would be the easier maintainability of more adapters and "plugin" support for community adapters. As a last thought: Theoretically all current adapters could be rewritten to use this intermediate adapter (some obviously lacking support for subscriptions) |
Beta Was this translation helpful? Give feedback.
-
@KATT since it seems feasible to implement, should I open an issue for this ? |
Beta Was this translation helpful? Give feedback.
-
Opened draft #5731 |
Beta Was this translation helpful? Give feedback.
-
Cloudflare Worker's has a feature called Durable Objects which can connect multiple clients over websockets to the same cloudflare worker instance. https://developers.cloudflare.com/workers/learning/using-websockets/ and https://developers.cloudflare.com/workers/learning/using-websockets/#durable-objects-and-websocket-state
I wonder if it's possible to run tRPC on the Cloudflare Worker Durable Object. There is complexity around using this though. Clients don't connect to the durable object worker directly, instead they connect to a cloudflare worker which then connects to another cloudflare worker acting as the durable object.
This would be a nice way to have serverless tRPC over websockets. Somewhat related to #3850
I'll spend some time thinking about this, but would appreciate other people's thoughts. Edit: I just found https://github.com/cleaton/cloudflare-trpc-websocket, which seems interesting
🙏
Beta Was this translation helpful? Give feedback.
All reactions