Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for non streaming #1606

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion src/routes/conversation/[id]/+server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export async function POST({ request, locals, params, getClientAddress }) {

const {
inputs: newPrompt,
is_streaming: isStreaming, // boolean flag to check if streaming or non-streaming
id: messageId,
is_retry: isRetry,
is_continue: isContinue,
Expand All @@ -156,6 +157,7 @@ export async function POST({ request, locals, params, getClientAddress }) {
.min(1)
.transform((s) => s.replace(/\r\n/g, "\n"))
),
is_streaming: z.optional(z.boolean()), // added is_streaming
is_retry: z.optional(z.boolean()),
is_continue: z.optional(z.boolean()),
web_search: z.optional(z.boolean()),
Expand Down Expand Up @@ -323,10 +325,71 @@ export async function POST({ request, locals, params, getClientAddress }) {

let lastTokenTimestamp: undefined | Date = undefined;

// redeclaration of hasError
let hasError = false;

// we now build the stream
const stream = new ReadableStream({
async start(controller) {
messageToWriteTo.updates ??= [];
// handle non-streaming requests
if (!isStreaming) {
try {
// Generate the text all at once
const ctx = {
model,
endpoint: await model.getEndpoint(),
conv,
messages: messagesForPrompt,
assistant: undefined,
isContinue: isContinue ?? false,
webSearch: webSearch ?? false,
toolsPreference: toolsPreferences ?? [],
promptedAt,
ip: getClientAddress(),
username: locals.user?.username,
};

let finalResult = "";
const initialMessageContent = "";
for await (const event of textGeneration(ctx)) {
if (event.type === MessageUpdateType.Stream && event.token != "") {
finalResult = event.token;
} else if (event.type === MessageUpdateType.FinalAnswer) {
messageToWriteTo.content = initialMessageContent + event.text;
messageToWriteTo.interrupted = event.interrupted;
}
}

// Update conversation in database
await collections.conversations.updateOne(
{ _id: convId },
{ $set: { messages: conv.messages, title: conv?.title, updatedAt: new Date() } }
);

// Send final result as JSON
controller.enqueue(
JSON.stringify({
content: finalResult,
updates: messageToWriteTo.updates,
interrupted: messageToWriteTo.interrupted ?? false,
}) + "\n"
);
} catch (e) {
hasError = true;
controller.enqueue(
JSON.stringify({
type: MessageUpdateType.Status,
status: MessageUpdateStatus.Error,
message: (e as Error).message,
}) + "\n"
);
logger.error(e);
} finally {
// Make sure stream is closed
controller.close();
}
}
async function update(event: MessageUpdate) {
if (!messageToWriteTo || !conv) {
throw Error("No message or conversation to write events to");
Expand Down Expand Up @@ -419,7 +482,6 @@ export async function POST({ request, locals, params, getClientAddress }) {
);
messageToWriteTo.updatedAt = new Date();

let hasError = false;
const initialMessageContent = messageToWriteTo.content;

try {
Expand Down