2024-04-20 19:03:52 +00:00
type TextStreamUpdate = {
done : boolean ;
value : string ;
} ;
// createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response,
// and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
export async function createOpenAITextStream (
2024-04-21 09:45:07 +00:00
messageStream : ReadableStreamDefaultReader ,
splitLargeDeltas : boolean
2024-04-20 19:03:52 +00:00
) : Promise < AsyncGenerator < TextStreamUpdate > > {
2024-04-21 09:45:07 +00:00
let iterator = openAIStreamToIterator ( messageStream ) ;
if ( splitLargeDeltas ) {
iterator = streamLargeDeltasAsRandomChunks ( iterator ) ;
}
return iterator ;
2024-04-20 19:03:52 +00:00
}
async function * openAIStreamToIterator (
reader : ReadableStreamDefaultReader
) : AsyncGenerator < TextStreamUpdate > {
while ( true ) {
const { value , done } = await reader . read ( ) ;
if ( done ) {
yield { done : true , value : '' } ;
break ;
}
const lines = value . split ( '\n' ) ;
for ( const line of lines ) {
if ( line !== '' ) {
console . log ( line ) ;
if ( line === 'data: [DONE]' ) {
yield { done : true , value : '' } ;
2024-04-26 13:45:29 +00:00
} else if ( line . startsWith ( ':' ) ) {
2024-04-26 13:37:18 +00:00
// Events starting with : are comments https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
// OpenRouter sends heartbeats like ": OPENROUTER PROCESSING"
continue
2024-04-20 19:03:52 +00:00
} else {
2024-04-26 17:38:25 +00:00
try {
const data = JSON . parse ( line . replace ( /^data: / , '' ) ) ;
console . log ( data ) ;
2024-04-20 19:03:52 +00:00
2024-04-26 17:42:43 +00:00
yield { done : false , value : data.choices?. [ 0 ] ? . delta ? . content ? ? '' } ;
2024-04-26 17:38:25 +00:00
} catch ( e ) {
console . error ( 'Error extracting delta from SSE event:' , e ) ;
}
2024-04-20 19:03:52 +00:00
}
}
}
}
}
// streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
// This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
async function * streamLargeDeltasAsRandomChunks (
iterator : AsyncGenerator < TextStreamUpdate >
) : AsyncGenerator < TextStreamUpdate > {
for await ( const textStreamUpdate of iterator ) {
if ( textStreamUpdate . done ) {
yield textStreamUpdate ;
return ;
}
let content = textStreamUpdate . value ;
if ( content . length < 5 ) {
yield { done : false , value : content } ;
continue ;
}
while ( content != '' ) {
const chunkSize = Math . min ( Math . floor ( Math . random ( ) * 3 ) + 1 , content . length ) ;
const chunk = content . slice ( 0 , chunkSize ) ;
yield { done : false , value : chunk } ;
await sleep ( 5 ) ;
content = content . slice ( chunkSize ) ;
}
}
}
const sleep = ( ms : number ) = > new Promise ( ( resolve ) = > setTimeout ( resolve , ms ) ) ;