Advanced Usage
Multiple Metrics
Run multiple metrics on the same dataset:Copy
metrics = [
Metric(
name="Helpfulness",
prompt="Rate helpfulness: @CURRENT_MESSAGE.output",
evaluation_level=EvaluationLevel.MESSAGE,
output_type=OutputType.PROGRESS,
),
Metric(
name="Clarity",
prompt="Rate clarity: @CURRENT_MESSAGE.output",
evaluation_level=EvaluationLevel.MESSAGE,
output_type=OutputType.PROGRESS,
),
Metric(
name="Accuracy",
prompt="Rate accuracy: @CURRENT_MESSAGE.output",
evaluation_level=EvaluationLevel.MESSAGE,
output_type=OutputType.PROGRESS,
),
]
all_results = {}
for metric in metrics:
results = await client.evaluate(
dataset_id=dataset_id,
metric=metric,
)
all_results[metric.name] = results
Manual Orchestration
For more control over the evaluation process:Copy
from turnwise import EvaluationOrchestrator, OpenRouterProvider
# Create provider
provider = OpenRouterProvider(api_key="sk-or-xxx")
# Create orchestrator
orchestrator = EvaluationOrchestrator(
llm_provider=provider,
default_model="openai/gpt-4o-mini",
extract_goals=True,
)
# Get conversations
conversations = await client.get_conversations(dataset_id=1)
# Evaluate single conversation
results = await orchestrator.evaluate_conversation(
conversation=conversations[0],
metric=metric,
)
# Manually sync when ready
await client.sync_results(results)
Disable Auto-Sync
Run evaluations without syncing to TurnWise:Copy
results = await client.evaluate(
dataset_id=dataset_id,
metric=metric,
auto_sync=False, # Results stay local
)
# Process results locally
for result in results:
print(f"Entity {result.entity_id}: {result.result}")
# Sync later if desired
await client.sync_results(results)
Using Existing Metrics
Fetch and use metrics already configured in TurnWise:Copy
# Get pipelines for a dataset
pipelines = await client.get_pipelines(dataset_id=1)
for pipeline in pipelines:
print(f"Pipeline: {pipeline.name}")
for node in pipeline.nodes:
print(f" - {node.name} ({node.evaluation_level})")
# Use an existing metric
existing_metric = Metric(
name=pipelines[0].nodes[0].name,
prompt=pipelines[0].nodes[0].prompt,
evaluation_level=pipelines[0].nodes[0].evaluation_level,
output_type=pipelines[0].nodes[0].output_type,
node_id=pipelines[0].nodes[0].id, # Already registered
pipeline_id=pipelines[0].id,
)
results = await client.evaluate(
dataset_id=dataset_id,
metric=existing_metric,
)
Custom LLM Providers
Use a custom LLM provider:Copy
from turnwise.llm import LLMProvider
class CustomProvider(LLMProvider):
async def generate(self, prompt: str, model: str, **kwargs):
# Your custom implementation
pass
provider = CustomProvider()
orchestrator = EvaluationOrchestrator(
llm_provider=provider,
default_model="your-model",
)
Progress Tracking
Track evaluation progress:Copy
def on_progress(progress):
print(f"Completed: {progress.completed}/{progress.total}")
print(f"Percentage: {progress.percentage:.1f}%")
print(f"Current: {progress.current_entity_id}")
results = await client.evaluate(
dataset_id=dataset_id,
metric=metric,
progress_callback=on_progress,
)
Error Handling
Handle errors gracefully:Copy
try:
results = await client.evaluate(
dataset_id=dataset_id,
metric=metric,
)
except Exception as e:
print(f"Evaluation failed: {e}")
# Handle error
Concurrent Evaluation
Control concurrency:Copy
# Evaluate up to 5 conversations concurrently
results = await client.evaluate(
dataset_id=dataset_id,
metric=metric,
max_concurrent=5,
)
Context Managers
Use context managers for automatic cleanup:Copy
async with TurnWiseClient(
turnwise_api_key="tw_xxx",
openrouter_api_key="sk-or-xxx"
) as client:
await client.verify()
results = await client.evaluate(
dataset_id=dataset_id,
metric=metric,
)
# Client automatically closed
Filtering Conversations
Evaluate specific conversations:Copy
conversations = await client.get_conversations(dataset_id=1)
# Filter conversations
filtered = [c for c in conversations if c.name.startswith("test_")]
# Evaluate filtered conversations
results = []
for conv in filtered:
conv_results = await orchestrator.evaluate_conversation(
conversation=conv,
metric=metric,
)
results.extend(conv_results)
await client.sync_results(results)
Custom Output Processing
Process results before syncing:Copy
results = await client.evaluate(
dataset_id=dataset_id,
metric=metric,
auto_sync=False,
)
# Custom processing
processed_results = []
for result in results:
# Transform result
if result.get_score() and result.get_score() < 0.5:
# Add flag for low scores
result.metadata = {"flagged": True}
processed_results.append(result)
# Sync processed results
await client.sync_results(processed_results)
Batch Processing
Process large datasets in batches:Copy
conversations = await client.get_conversations(dataset_id=1)
batch_size = 100
for i in range(0, len(conversations), batch_size):
batch = conversations[i:i + batch_size]
print(f"Processing batch {i // batch_size + 1}")
results = []
for conv in batch:
conv_results = await orchestrator.evaluate_conversation(
conversation=conv,
metric=metric,
)
results.extend(conv_results)
await client.sync_results(results)
print(f"Batch {i // batch_size + 1} complete")