Continuing from the last article, weβve already deployed our AI-powered real estate listing demo Nuxt application to Cloudflare, including the D1 database and AI Workers binding. In this article, we will set up Cloudflare Vectorize and implement a Cloudflare Queue to populate the Vector store with embeddings for our locations.
This article is the second part of the Nuxt & Cloudflare AI Vector Pipeline Series, a three-part series.
- Part one - Nuxt & Cloudflare Vectorize: Setting up D1, Drizzle, and Workers AI
- Part two - Nuxt & Cloudflare Queues: Building a Data Sync Pipeline using Vectorize
- Part three - Implementing semantic matching in Nuxt with Cloudflare Vectorize
The code for the entire demo application is publicly available on GitHub: Nuxt & Cloudflare AI Vector Pipeline Series - give it a β.
Create a vector index on Cloudflare Vectorize and bind it to our Nuxt application
As briefly explained in the previous article, Vectorize is Cloudflareβs native vector store. A vector store is essentially a database designed to hold text embeddings and index them for fast querying. Text embeddings are numeric representations (lists of numbers) that AI models use to read and understand data. Feel free to ask in comments below if my subtle explanation of Vectorize and vector stores leads to more questions - I know it's too broad π.
Cloudflareβs Workers AI supports several embedding models; we will use @cf/baai/bge-m3 because of my experience with it on similar projects and its multilingual support, which is ideal for semantic matching use cases.
We will use Wrangler CLI to create the vector index on Cloudflare and bind it to the project through Wranglerβs config so we can build the index with the locationsβ text embeddings later.
To use the bge-m3 embedding model, we need to specify a dimension of 1024. Run:
npx wrangler vectorize create locations-index --dimensions=1024 --metric=cosine
and add the bindings in /wrangler.toml:
[[vectorize]]
binding = "VECTORIZE"
index_name = "locations-index"
Now that we've created the location vector index, we need to enable filtering on our metadata. While upserts work automatically based on ID, we must explicitly index locationId so we can filter search results or delete obsolete records by their location ID in the future.
npx wrangler vectorize create-metadata-index locations-index --property-name=locationId --type=string
We can now build our code and deploy it to make sure Vectorize is being used, and the index is created:
pnpm run build
npx wrangler deploy
The CLI output should show that our worker has access to Vectorize and AI (which we previously added):
Your Worker has access to the following bindings:
Binding Resource
env.DB (property-sync-db) D1 Database
env.VECTORIZE (locations-index) Vectorize Index
env.ASSETS Assets
env.AI AI
Upserting vectors using Cloudflare Workers AI and Vectorize
We will use a simple repository pattern for Cloudflare Vectorize, allowing us to easily swap implementations if needed. This sounds like overkill for such a small demo application, right? I recently avoided a substantial amount of time wasted when NuxtHub announced theyβre deprecating their AI features. I was building a similar feature using hubAi() and hubVectorize() from NuxtHubβs core module, but I had to refactor it before I finished. Thanks to my use of the repository pattern, I only had to swap the repository implementation from NuxthubVectorAIRepository to CloudflareVectorAIRepository as, all other code was bound to the VectorAIRepository interface.
π‘ I donβt see anything wrong with enforcing a Repository Pattern throughout the application. I also donβt see anything wrong with using the pattern for only some specific dependencies. In this case. I know that thereβs a chance we will need to change the Vector and AI implementations, but changing the ORM (Drizzle) implementation is less likely. Let me know in the comments below your thoughts on the Repository Pattern in Nuxt.
I will go into detail about Queues and Handlers in the next section of this article. For now, we just need to know that the vectorisation of the location data will be queued, and that a queue handler will depend on a Location Vector Repository. Letβs add this dependency.
Create /server/utils/queueHandlers/repositories/LocationVectorRepository.ts with:
import { Location } from '~~/server/database/types'
export interface LocationVectorRepository {
/**
* Upserts locations into the Vector Store.
*
* @param locations - An array of Location objects to be upserted.
* @returns A promise that resolves to the number of locations upserted.
*/
upsertLocations (
locations: Location[],
): Promise<number>;
}
And install the following types from Cloudflare Workers if you havenβt already: pnpm install @cloudflare/workers-types.
Database types in /server/database/types.ts:
import * as schema from './schema';
export type Location = typeof schema.locations.$inferSelect;
export type Agent = typeof schema.agents.$inferSelect;
export type Property = typeof schema.properties.$inferSelect;
AI types in /server/types/ai-models.ts:
export interface BGEEmbeddingResponse {
data?: number[][];
response?: number[][];
shape?: number[];
}
We can now finally implement the upsertLocations() method in our Cloudflare implementation of the LocationVectorRepository interface.
Letβs start by creating the repository class in /server/repositories/cloudflare/CloudflareVectorAIRepository.ts and add the following contents:
import {
LocationVectorRepository,
} from '~~/server/utils/queueHandlers/repositories/LocationVectorRepository'
import { Location } from '~~/server/database/types'
import type { Ai, VectorizeIndex } from '@cloudflare/workers-types'
import { BGEEmbeddingResponse } from '~~/server/types/ai-models'
export class CloudflareVectorAIRepository implements LocationVectorRepository {
constructor (
private readonly vectorIndex: VectorizeIndex,
private readonly ai: Ai,
) {
if (!vectorIndex) {
throw new Error(
'Missing Vectorize binding.')
}
if (!ai) {
throw new Error('Missing AI binding.')
}
}
/**
* Upserts the given locations into the vector index.
*
* @param locations
*/
public async upsertLocations (locations: Location[]): Promise<number> {
if (locations.length === 0) {
return 0
}
const AI_BATCH_SIZE = 10
let successfulUpserts = 0
for (let i = 0; i < locations.length; i += AI_BATCH_SIZE) {
const batch = locations.slice(i, i + AI_BATCH_SIZE)
try {
const textsToEmbed = batch.map((location) =>
this.buildTextForEmbedding(location),
)
const vectors = await this.embedTextBatch(textsToEmbed)
const vectorObjects = batch.map((location, idx) => {
const vector = vectors[idx]
if (!vector) return null
return {
id: location.id,
values: vector,
metadata: {
locationId: location.id,
},
}
}).filter((obj): obj is NonNullable<typeof obj> => obj !== null)
if (vectorObjects.length > 0) {
const result = await this.vectorIndex.upsert(vectorObjects)
console.log('[Repo] Upsert result:', JSON.stringify(result))
successfulUpserts += vectorObjects.length
}
} catch (error) {
console.error(
`[CloudflareVectorAIRepository] Failed to process batch starting at index ${i}`,
error,
)
}
}
return successfulUpserts
}
/**
* Embeds a batch of texts using the BGE-M3 model.
*
* @param texts
* @private
*/
private async embedTextBatch (texts: string[]): Promise<number[][]> {
const response = (await this.ai.run(
'@cf/baai/bge-m3', {
text: texts,
response_format: 'embedding_vector',
},
)) as BGEEmbeddingResponse
const vectors = response.data ?? response.response
if (!vectors) throw new Error('BGE-M3 invalid response')
return vectors
}
/**
* Builds the text representation for embedding from a Location.
*
* @param location
* @private
*/
private buildTextForEmbedding (location: Location): string {
return `${location.name} | ${location.postcodes.join(', ')}`
}
}
The repository requires Cloudflareβs Vector Index and AI bindings in its constructor. We will resolve these from Cloudflareβs environment in the queue handler later.
The upsert locations method takes a list of locations from our database, builds text embeddings for each location in batches of 10, maps them to vector objects with the index metadata, and then upserts the vector objects to the vector store.
Iβm intentionally excluding unit and integration tests from this demo application for brevity. Before we can test this repository end-to-end, we need to set up our Cloudflare Queue.
How to dispatch and handle Cloudflare Queues in Nuxt
Queues allow us to process tasks in the background, which is essential for decoupling long-running operations from user requests. Like most queue services, Cloudflare Queues includes features such as batching and retries.
In our Nuxt application, we will implement this in two parts:
- The Producer (Nitro Task): We will dispatch messages containing just the Location ID from our database. To be efficient, we will use sendBatch to dispatch up to ten IDs at a time.
- The Consumer (Nitro Plugin): We will listen for these messages within the same Nuxt application (running as a Cloudflare Worker) using a Nitro plugin to process the data.
Create a Cloudflare Queue and bind the producer and consumer to the same Nuxt application
β οΈ Cloudflare Queues are only available on paid Worker Plans. Currently at $5 per month, plus extra usage charges.
We can create the vector sync queue using Wrangler CLI:
npx wrangler queues create vector-sync-queue
And add the bindings in /wrangler.toml similar to:
[[queues.producers]]
queue = "vector-sync-queue"
binding = "VECTOR_SYNC_QUEUE"
[[queues.consumers]]
queue = "vector-sync-queue"
max_batch_size = 10 # Recommended: Process 10 vectors at a time
max_batch_timeout = 5 # Wait up to 5s to fill a batch
Once you build and deploy, Cloudflare automatically configures your Nuxt Worker to act as the consumer for this queue.
Dispatch to a Cloudflare Queue from a Nuxt application through a Nitro Task
Next, we will write a Nitro Task that dispatches messages to our newly created queue. If youβre not following along from the previous article, make sure you add Nitroβs experimental tasks and async context to Nuxtβs config:
nitro: {
preset: 'cloudflare_module',
experimental: {
tasks: true,
asyncContext: true,
},
},
Letβs first add some types in /server/types/queues.ts:
import type {
Ai,
Queue,
VectorizeIndex,
D1Database,
ExecutionContext,
} from '@cloudflare/workers-types'
export interface CloudflareEnv {
AI: Ai;
VECTORIZE: VectorizeIndex;
VECTOR_SYNC_QUEUE: Queue;
DB: D1Database
[key: string]: unknown;
}
export interface VectorSyncQueueMessageBody {
locationId: string;
}
export interface CloudflareTaskContext {
cloudflare: {
env: CloudflareEnv;
context: ExecutionContext;
};
}
And create /server/tasks/build-location-embeddings.ts with:
import { drizzle } from 'drizzle-orm/d1'
import * as schema from '~~/server/database/schema'
import type { D1Database, Queue } from '@cloudflare/workers-types'
import { CloudflareTaskContext } from '~~/server/types/queues'
export default defineTask({
meta: {
name: 'build-location-embeddings',
description: 'Queues all location IDs for vector embedding',
},
async run(event) {
const context = event.context as CloudflareTaskContext
const env = context.cloudflare?.env
if (!env?.DB) {
return { error: 'DB binding not found.' }
}
const db = drizzle(env.DB as D1Database, { schema })
// Queue Init
const queue = env.VECTOR_SYNC_QUEUE as Queue
if (!queue) {
return { error: 'Queue binding (VECTOR_SYNC_QUEUE) not found.' }
}
try {
const allLocations = await db.query.locations.findMany({
columns: { id: true },
})
if (allLocations.length === 0) return { result: 'No locations found in DB.' }
const total = allLocations.length
console.log(`[Task] Found ${total} locations. Dispatching to queue...`)
const messages = allLocations.map((location) => ({
body: { locationId: location.id },
}))
const CHUNK_SIZE = 10
for (let i = 0; i < messages.length; i += CHUNK_SIZE) {
const batch = messages.slice(i, i + CHUNK_SIZE)
await queue.sendBatch(batch)
}
return {
result: `Dispatched ${total} locations to VECTOR_SYNC_QUEUE.`,
}
} catch (error: any) {
console.error('[Task] Error during queue dispatch:', error)
return { error: error.message || 'Unknown error occurred.' }
}
},
})
Now that the dispatcher is done. We can trigger the Nitro task from CLI, DevTools, CRON, or programmatically. Since Cloudflareβs Workers AI runs on remote infrastructure, we will build an API endpoint that triggers the Nitro task programmatically after we implement the listener and handler.
π‘There are several ways you can safely test Cloudflareβs AI without touching the production environment. You can create bindings specific to a staging environment and also use the
getPlatformProxy()wrangler function in tests.
Listen and handle Cloudflare Queue messages from a Nuxt application using a Nitro Plugin
With the queue dispatcher completed, we now need a Nitro Plugin to listen to the dispatched queue and handle it. In the next part of this article series, we will use another queue to synchronise the Agentsβ listings. To avoid a massive queue handler class, we will split the listener and handler code.
The following is the handler that depends on the LocationVectorRepository interface we declared earlier.
Create the handler in /server/utils/queueHandlers/VectorSyncQueueHandler.ts with:
import {
LocationVectorRepository,
} from '~~/server/utils/queueHandlers/repositories/LocationVectorRepository'
import { Location } from '~~/server/database/types'
import * as schema from '~~/server/database/schema'
import { drizzle } from 'drizzle-orm/d1'
import { inArray } from 'drizzle-orm'
import type { D1Database, MessageBatch } from '@cloudflare/workers-types'
import type {
VectorSyncQueueMessageBody,
CloudflareEnv,
} from '~~/server/types/queues'
export class VectorSyncQueueHandler {
constructor (
private readonly locationVectorRepository: LocationVectorRepository,
) {}
public async handle (
batch: MessageBatch<VectorSyncQueueMessageBody>,
env: CloudflareEnv,
): Promise<void> {
const messages = batch.messages
const db = drizzle(env.DB as D1Database, { schema })
// Extract Unique IDs
const locationIds = new Set<string>()
for (const message of messages) {
if (message.body?.locationId) {
locationIds.add(message.body.locationId)
}
}
if (locationIds.size === 0) {
console.log('[Vector Sync Handler] No valid locationIds found in batch.')
return
}
try {
const ids = Array.from(locationIds)
// Fetch Data
const locationsToUpsert: Location[] = await db.query.locations.findMany({
where: inArray(schema.locations.id, ids),
})
if (locationsToUpsert.length > 0) {
// Update Vector Index
await this.locationVectorRepository.upsertLocations(locationsToUpsert)
console.log(
`[Vector Sync Handler] Successfully processed ${locationsToUpsert.length} locations.`)
}
} catch (error: any) {
console.error('[Vector Sync Handler] Error:', error.message)
// Throwing error triggers Cloudflare's automatic queue retry logic
throw error
}
}
}
The handler class above loops through the queue messages, retrieves the full location data from our D1 database, and uses the LocationVectorRepository to upsert the data into the Cloudflare Vectorize store.
Finally, we need a Nitro Plugin to hook into and thus listen to Cloudflare Queues. The plugin will be the entry point for all dispatched Cloudflare Queues and will delegate the specific handler based on the queue name.
Create the plugin at /server/plugins/queue-handler.ts with:
import { VectorSyncQueueHandler } from '~~/server/utils/queueHandlers/VectorSyncQueueHandler'
import { CloudflareVectorAIRepository } from '~~/server/repositories/cloudflare/CloudflareVectorAIRepository'
import type { VectorSyncQueueMessageBody, CloudflareEnv } from '~~/server/types/queues'
import type { MessageBatch } from '@cloudflare/workers-types'
export default defineNitroPlugin((nitroApp) => {
nitroApp.hooks.hook('cloudflare:queue', async (payload) => {
const batch = payload.batch
const env = payload.env as CloudflareEnv
if (batch.queue === 'vector-sync-queue') {
console.log(`[Queue] Received batch of ${batch.messages.length} for ${batch.queue}`)
try {
const repo = new CloudflareVectorAIRepository(env.VECTORIZE, env.AI)
const handler = new VectorSyncQueueHandler(repo)
await handler.handle(
batch as MessageBatch<VectorSyncQueueMessageBody>,
env
)
} catch (error: any) {
console.error('[Queue] Error processing batch:', error)
// This will trigger a retry as per the wrangler.toml queue settings
throw error
}
}
})
})
Populating the Cloudflare Vectorize index from our Nuxt application
Weβve built everything we need to populate the vector store with our location data, and weβre now ready for the most exciting part of this article.
We will use an API endpoint to run it remotely. Create /server/api/internals/tasks/build-location-embeddings.get.ts with:
export default defineEventHandler(async (event) => {
const config = useRuntimeConfig();
if (getHeader(event, 'x-secret') !== config.internalApiSecret) {
throw createError({ statusCode: 401, statusMessage: 'Unauthorized' });
}
const result = await runTask(
'build-location-embeddings',
{
payload: {},
context: {
cloudflare: event.context.cloudflare
}
}
);
return {
status: 'Build Location Embeddings Task Triggered',
result,
};
})
This endpoint triggers our Nitro Task programatically using the runTask() function. Notice weβre using the internal API secret we added as an environment variable to the Cloudflare Worker in the previous article. Also good to note that weβre setting the context to use Cloudflareβs context, since the Nitro Task depends on Cloudflareβs D1 database and the Vector Sync Queue.
With the API endpoint added, go ahead, deploy and test:
pnpm run build
npx wrangler deploy
curl -H "x-secret: [YOUR_SECRET]" "https://[YOUR_WORKER_URL]/api/internals/tasks/build-location-embeddings"
In your Cloudflare worker dashboard, you can see the endpoint trigger and then the queue. We can also verify that the vector index is populated through Wrangler, but you need to wait a couple of minutes for the queue to be processed.
npx wrangler vectorize info locations-index
# Should return something like:
β
οΈ wrangler 4.53.0
βββββββββββββββββββ
π Fetching index info...
ββββββββββββββ¬ββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββ
β dimensions β vectorCount β processedUpToMutation β processedUpToDatetime β
ββββββββββββββΌββββββββββββββΌβββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ€
β 1024 β 4 β aa21b6a6-e25f-4b1c-afa8-98846e801ec6 β 2025-12-08T09:47:59.146Z β
ββββββββββββββ΄ββββββββββββββ΄βββββββββββββββββββββββββββββββββββββββ΄βββββββββββββββββββββββββββ
npx wrangler vectorize list-vectors locations-index
# Should list the Location ID:
β
οΈ wrangler 4.53.0
βββββββββββββββββββ
π Listing vectors in index 'locations-index'...
βββββ¬βββββββββββββββββββββββββββββββββββββββ
β # β Vector ID β
βββββΌβββββββββββββββββββββββββββββββββββββββ€
β 1 β 1e212fe9-3994-4c7a-a90f-22df3fdee8d5 β
βββββΌβββββββββββββββββββββββββββββββββββββββ€
β 2 β 0168e16d-0571-466d-809b-d717e58d7cab β
βββββΌβββββββββββββββββββββββββββββββββββββββ€
β 3 β 1818d8a6-baf1-4780-b6d7-a8e6f45b3369 β
βββββΌβββββββββββββββββββββββββββββββββββββββ€
β 4 β 76d29d74-5dfe-4a46-b15d-802e976d5250 β
βββββ΄βββββββββββββββββββββββββββββββββββββββ
Showing 4 of 4 total vectors
Moving forward with implementing our Semantic Matching feature
The final part of this Nuxt & Cloudflare AI Vector Pipeline Series is where the magic happens: we will implement the location semantic matching to tie everything together.
I hope youβre enjoying this series! As always, feel free to ask questions in the comments below or on social media. Please do share this with anyone you think might find it useful, and subscribe if youβd like me to email you when I prepare more code examples and articles.
Peace βπ½
Need help with Nuxt & Cloudflare?
Keith Mifsud is an Official Nuxt Agency Partner, learn more.
Top comments (0)