Pipeline
s define the schema for the transformation of documents. Different Pipeline
s can be used for different tasks.
See our guide to Constructing Piplines for more information on how to create Pipelines
.
New Pipeline
s require schema. Here are a few examples of variations of schema along with common use cases.
For the following section we will assume we have documents that have the structure:
content_copy
{
"id": "Each document has a unique id",
"title": "Each document has a title",
"body": "Each document has some body text"
}
content_copy
const pipeline = korvus.newPipeline("test_pipeline", {
title: {
full_text_search: { configuration: "english" },
},
body: {
splitter: { model: "recursive_character" },
semantic_search: {
model: "Alibaba-NLP/gte-base-en-v1.5",
},
},
});
content_copy
pipeline = Pipeline(
"test_pipeline",
{
"title": {
"full_text_search": {"configuration": "english"},
},
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {
"model": "Alibaba-NLP/gte-base-en-v1.5",
},
},
},
)
content_copy
let mut pipeline = Pipeline::new(
"test_pipeline",
Some(
serde_json::json!({
"title": {
"full_text_search": {"configuration": "english"},
},
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {
"model": "Alibaba-NLP/gte-base-en-v1.5",
},
},
})
.into(),
),
)?;
content_copy
PipelineC * pipeline = korvus_pipelinec_new(
"test_pipeline",
"{\
\"title\": {\
\"full_text_search\": {\"configuration\": \"english\"},\
},\
\"body\": {\
\"splitter\": {\"model\": \"recursive_character\"},\
\"semantic_search\": {\
\"model\": \"Alibaba-NLP/gte-base-en-v1.5\"\
}\
}\
}"
);
This Pipeline
does two things. For each document in the Collection
, it converts all title
s into tsvectors enabling full text search, and splits and embeds the body
text enabling semantic search using vectors. This kind of Pipeline
would be great for site search utilizing hybrid keyword and semantic search.
For a more simple RAG use case, the following Pipeline
would work well.
content_copy
const pipeline = korvus.newPipeline("test_pipeline", {
body: {
splitter: { model: "recursive_character" },
semantic_search: {
model: "Alibaba-NLP/gte-base-en-v1.5",
},
},
});
content_copy
pipeline = Pipeline(
"test_pipeline",
{
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {
"model": "Alibaba-NLP/gte-base-en-v1.5",
},
},
},
)
content_copy
let mut pipeline = Pipeline::new(
"test_pipeline",
Some(
serde_json::json!({
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {
"model": "Alibaba-NLP/gte-base-en-v1.5",
},
},
})
.into(),
),
)?;
content_copy
PipelineC * pipeline = korvus_pipelinec_new(
"test_pipeline",
"{\
\"body\": {\
\"splitter\": {\"model\": \"recursive_character\"},\
\"semantic_search\": {\
\"model\": \"Alibaba-NLP/gte-base-en-v1.5\"\
}\
}\
}"
);
This Pipeline
splits and embeds the body
text enabling semantic search using vectors. This is a very popular Pipeline
for RAG.
We support most every open source model on Hugging Face, and OpenAI's embedding models. To use a model from OpenAI specify the source
as openai
, and make sure and set the environment variable OPENAI_API_KEY
.
content_copy
const pipeline = korvus.newPipeline("test_pipeline", {
body: {
splitter: { model: "recursive_character" },
semantic_search: {
model: "text-embedding-ada-002",
source: "openai"
},
},
});
content_copy
pipeline = Pipeline(
"test_pipeline",
{
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {"model": "text-embedding-ada-002", "source": "openai"},
},
},
)
content_copy
let mut pipeline = Pipeline::new(
"test_pipeline",
Some(
serde_json::json!({
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {
"model": "text-embedding-ada-002",
"source": "openai"
},
},
})
.into(),
),
)?;
content_copy
PipelineC * pipeline = korvus_pipelinec_new(
"test_pipeline",
"{\
\"body\": {\
\"splitter\": {\"model\": \"recursive_character\"},\
\"semantic_search\": {\
\"model\": \"text-embedding-ada-002\",\
\"source\": \"openai\"\
}\
}\
}"
);
By default the SDK uses HNSW indexes to efficiently perform vector recall. The default HNSW index sets m
to 16 and ef_construction
to 64. These defaults can be customized in the Pipeline
schema. See pgvector for more information on vector indexes.
content_copy
const pipeline = korvus.newPipeline("test_pipeline", {
body: {
splitter: { model: "recursive_character" },
semantic_search: {
model: "Alibaba-NLP/gte-base-en-v1.5",
hnsw: {
m: 100,
ef_construction: 200
}
},
},
});
content_copy
pipeline = Pipeline(
"test_pipeline",
{
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {
"model": "Alibaba-NLP/gte-base-en-v1.5",
"hnsw": {"m": 100, "ef_construction": 200},
},
},
},
)
content_copy
let mut pipeline = Pipeline::new(
"test_pipeline",
Some(
serde_json::json!({
"body": {
"splitter": {"model": "recursive_character"},
"semantic_search": {
"model": "Alibaba-NLP/gte-base-en-v1.5",
"hnsw": {"m": 100, "ef_construction": 200}
},
},
})
.into(),
),
)?;
content_copy
PipelineC * pipeline = korvus_pipelinec_new(
"test_pipeline",
"{\
\"body\": {\
\"splitter\": {\"model\": \"recursive_character\"},\
\"semantic_search\": {\
\"model\": \"Alibaba-NLP/gte-base-en-v1.5\",\
\"hnsw\": {\"m\": 100, \"ef_construction\": 200}\
}\
}\
}"
);
The first time a Pipeline
is added to a Collection
it will automatically chunk and embed any documents already in that Collection
.
content_copy
await collection.add_pipeline(pipeline)
content_copy
await collection.add_pipeline(pipeline)
content_copy
collection.add_pipeline(&mut pipeline).await?;
content_copy
korvus_collectionc_add_pipeline(collection, pipeline);
Note: After a Pipeline
has been added to a Collection
instances of the Pipeline
object can be created without specifying a schema:
content_copy
const pipeline = korvus.newPipeline("test_pipeline")
content_copy
pipeline = Pipeline("test_pipeline")
content_copy
let mut pipeline = Pipeline::new("test_pipeline", None)?;
content_copy
PipelineC * pipeline = korvus_pipelinec_new("test_pipeline", NULL);
There are two different forms of search that can be done after adding a Pipeline
to a Collection
See their respective pages for more information on searching.
Pipelines
can be disabled or removed to prevent them from running automatically when documents are upserted.
content_copy
const pipeline = korvus.newPipeline("test_pipeline")
const collection = korvus.newCollection("test_collection")
await collection.disable_pipeline(pipeline)
content_copy
pipeline = Pipeline("test_pipeline")
collection = Collection("test_collection")
await collection.disable_pipeline(pipeline)
content_copy
let mut collection = Collection::new("test_collection", None)?;
let mut pipeline = Pipeline::new("test_pipeline", None)?;
collection.disable_pipeline(&mut pipeline).await?;
content_copy
CollectionC * collection = korvus_collectionc_new("test_collection", NULL);
PipelineC * pipeline = korvus_pipelinec_new("test_pipeline", NULL);
korvus_collectionc_disable_pipeline(collection, pipeline);
Disabling a Pipeline
prevents it from running automatically, but leaves all tsvectors, chunks, and embeddings already created by that Pipeline
in the database.
Disabled Pipeline
s can be re-enabled.
content_copy
const pipeline = korvus.newPipeline("test_pipeline")
const collection = korvus.newCollection("test_collection")
await collection.enable_pipeline(pipeline)
content_copy
pipeline = Pipeline("test_pipeline")
collection = Collection("test_collection")
await collection.enable_pipeline(pipeline)
content_copy
let mut collection = Collection::new("test_collection", None)?;
let mut pipeline = Pipeline::new("test_pipeline", None)?;
collection.enable_pipeline(&mut pipeline).await?;
content_copy
CollectionC * collection = korvus_collectionc_new("test_collection", NULL);
PipelineC * pipeline = korvus_pipelinec_new("test_pipeline", NULL);
korvus_collectionc_enable_pipeline(collection, pipeline);
Enabling a Pipeline
will cause it to automatically run on all documents it may have missed while disabled.
content_copy
const pipeline = korvus.newPipeline("test_pipeline")
const collection = korvus.newCollection("test_collection")
await collection.remove_pipeline(pipeline)
content_copy
pipeline = Pipeline("test_pipeline")
collection = Collection("test_collection")
await collection.remove_pipeline(pipeline)
content_copy
let mut collection = Collection::new("test_collection", None)?;
let mut pipeline = Pipeline::new("test_pipeline", None)?;
collection.remove_pipeline(&mut pipeline).await?;
content_copy
CollectionC * collection = korvus_collectionc_new("test_collection", NULL);
PipelineC * pipeline = korvus_pipelinec_new("test_pipeline", NULL);
korvus_collectionc_remove_pipeline(collection, pipeline);
Removing a Pipeline
deletes it and all associated data from the database. Removed Pipelines
cannot be re-enabled but can be recreated.