| 彗星價格 (USD / M Tokens) | 官方價格 (USD / M Tokens) | 折扣 |
|---|---|---|
輸入:$0.76/M 輸出:$3.19998/M | 輸入:$0.95/M 輸出:$3.999975/M | -20% |
import base64
import json
import os
import subprocess
import tempfile
from pathlib import Path
from openai import OpenAI
# Get your CometAPI key from https://www.cometapi.com/console/token
COMETAPI_KEY = os.environ.get("COMETAPI_KEY") or "<YOUR_COMETAPI_KEY>"
BASE_URL = "https://api.cometapi.com/v1"
MODEL = "kimi-k2.7-code"
VIDEO_PATH = os.environ.get("KIMI_VIDEO_PATH") or str(Path.home() / "Downloads" / "test_video.mp4")
client = OpenAI(base_url=BASE_URL, api_key=COMETAPI_KEY)
tools = [
{
"type": "function",
"function": {
"name": "watch_video_clip",
"description": "Watch a video file or a sub-clip of it. If start_time and end_time are not provided, the entire video will be returned.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to the video file to watch",
},
"start_time": {
"type": "number",
"description": "The start time of the clip in seconds (optional, defaults to 0)",
},
"end_time": {
"type": "number",
"description": "The end time of the clip in seconds (optional, defaults to end of video)",
},
},
"required": ["path"],
},
},
}
]
def video_block(path):
suffix = Path(path).suffix.lstrip(".") or "mp4"
with open(path, "rb") as video_file:
video_base64 = base64.b64encode(video_file.read()).decode("utf-8")
return {"type": "video_url", "video_url": {"url": f"data:video/{suffix};base64,{video_base64}"}}
def watch_video_clip(path, start_time=None, end_time=None):
video_path = Path(path).expanduser()
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")
if start_time is None and end_time is None:
return [
video_block(video_path),
{"type": "text", "text": f"Full video: {video_path.name}"},
]
probe = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(video_path)],
capture_output=True,
text=True,
check=True,
)
duration = float(json.loads(probe.stdout)["format"]["duration"])
start_time = start_time or 0
end_time = end_time or duration
clip_duration = end_time - start_time
with tempfile.NamedTemporaryFile(suffix=video_path.suffix or ".mp4", delete=False) as tmp:
tmp_path = tmp.name
try:
subprocess.run(
[
"ffmpeg",
"-y",
"-ss",
str(start_time),
"-i",
str(video_path),
"-t",
str(clip_duration),
"-c:v",
"libx264",
"-c:a",
"aac",
"-preset",
"fast",
"-crf",
"23",
"-movflags",
"+faststart",
"-loglevel",
"error",
tmp_path,
],
check=True,
)
return [
video_block(tmp_path),
{"type": "text", "text": f"Clip from {video_path.name}: {start_time}s - {end_time}s"},
]
finally:
os.unlink(tmp_path)
def agent_loop(user_message):
messages = [
{
"role": "system",
"content": "You are a video analysis assistant. Use watch_video_clip to examine specific portions of videos.",
},
{"role": "user", "content": user_message},
]
while True:
response = client.chat.completions.create(
model=MODEL,
messages=messages,
tools=tools,
tool_choice="auto",
)
message = response.choices[0].message
messages.append(message.model_dump(exclude_none=True))
if not message.tool_calls:
return message.content or response.model_dump_json(indent=2)
for tool_call in message.tool_calls:
if tool_call.function.name != "watch_video_clip":
continue
args = json.loads(tool_call.function.arguments or "{}")
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": watch_video_clip(
args["path"],
start_time=args.get("start_time"),
end_time=args.get("end_time"),
),
}
)
if __name__ == "__main__":
if not Path(VIDEO_PATH).expanduser().exists():
raise SystemExit(f"Set KIMI_VIDEO_PATH to a local video file before running this example: {VIDEO_PATH}")
answer = agent_loop(f"Analyze what happens between seconds 8-13 in {VIDEO_PATH}")
print(answer)import base64
import json
import os
import subprocess
import tempfile
from pathlib import Path
from openai import OpenAI
# Get your CometAPI key from https://www.cometapi.com/console/token
COMETAPI_KEY = os.environ.get("COMETAPI_KEY") or "<YOUR_COMETAPI_KEY>"
BASE_URL = "https://api.cometapi.com/v1"
MODEL = "kimi-k2.7-code"
VIDEO_PATH = os.environ.get("KIMI_VIDEO_PATH") or str(Path.home() / "Downloads" / "test_video.mp4")
client = OpenAI(base_url=BASE_URL, api_key=COMETAPI_KEY)
tools = [
{
"type": "function",
"function": {
"name": "watch_video_clip",
"description": "Watch a video file or a sub-clip of it. If start_time and end_time are not provided, the entire video will be returned.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to the video file to watch",
},
"start_time": {
"type": "number",
"description": "The start time of the clip in seconds (optional, defaults to 0)",
},
"end_time": {
"type": "number",
"description": "The end time of the clip in seconds (optional, defaults to end of video)",
},
},
"required": ["path"],
},
},
}
]
def video_block(path):
suffix = Path(path).suffix.lstrip(".") or "mp4"
with open(path, "rb") as video_file:
video_base64 = base64.b64encode(video_file.read()).decode("utf-8")
return {"type": "video_url", "video_url": {"url": f"data:video/{suffix};base64,{video_base64}"}}
def watch_video_clip(path, start_time=None, end_time=None):
video_path = Path(path).expanduser()
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")
if start_time is None and end_time is None:
return [
video_block(video_path),
{"type": "text", "text": f"Full video: {video_path.name}"},
]
probe = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(video_path)],
capture_output=True,
text=True,
check=True,
)
duration = float(json.loads(probe.stdout)["format"]["duration"])
start_time = start_time or 0
end_time = end_time or duration
clip_duration = end_time - start_time
with tempfile.NamedTemporaryFile(suffix=video_path.suffix or ".mp4", delete=False) as tmp:
tmp_path = tmp.name
try:
subprocess.run(
[
"ffmpeg",
"-y",
"-ss",
str(start_time),
"-i",
str(video_path),
"-t",
str(clip_duration),
"-c:v",
"libx264",
"-c:a",
"aac",
"-preset",
"fast",
"-crf",
"23",
"-movflags",
"+faststart",
"-loglevel",
"error",
tmp_path,
],
check=True,
)
return [
video_block(tmp_path),
{"type": "text", "text": f"Clip from {video_path.name}: {start_time}s - {end_time}s"},
]
finally:
os.unlink(tmp_path)
def agent_loop(user_message):
messages = [
{
"role": "system",
"content": "You are a video analysis assistant. Use watch_video_clip to examine specific portions of videos.",
},
{"role": "user", "content": user_message},
]
while True:
response = client.chat.completions.create(
model=MODEL,
messages=messages,
tools=tools,
tool_choice="auto",
)
message = response.choices[0].message
messages.append(message.model_dump(exclude_none=True))
if not message.tool_calls:
return message.content or response.model_dump_json(indent=2)
for tool_call in message.tool_calls:
if tool_call.function.name != "watch_video_clip":
continue
args = json.loads(tool_call.function.arguments or "{}")
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": watch_video_clip(
args["path"],
start_time=args.get("start_time"),
end_time=args.get("end_time"),
),
}
)
if __name__ == "__main__":
if not Path(VIDEO_PATH).expanduser().exists():
raise SystemExit(f"Set KIMI_VIDEO_PATH to a local video file before running this example: {VIDEO_PATH}")
answer = agent_loop(f"Analyze what happens between seconds 8-13 in {VIDEO_PATH}")
print(answer)
import { execFileSync } from "node:child_process";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import OpenAI from "openai";
// Get your CometAPI key from https://www.cometapi.com/console/token
const COMETAPI_KEY = process.env.COMETAPI_KEY || "<YOUR_COMETAPI_KEY>";
const BASE_URL = "https://api.cometapi.com/v1";
const MODEL = "kimi-k2.7-code";
const VIDEO_PATH = process.env.KIMI_VIDEO_PATH || path.join(os.homedir(), "Downloads", "test_video.mp4");
const client = new OpenAI({
apiKey: COMETAPI_KEY,
baseURL: BASE_URL,
});
const tools = [
{
type: "function",
function: {
name: "watch_video_clip",
description:
"Watch a video file or a sub-clip of it. If start_time and end_time are not provided, the entire video will be returned.",
parameters: {
type: "object",
properties: {
path: {
type: "string",
description: "The path to the video file to watch",
},
start_time: {
type: "number",
description: "The start time of the clip in seconds (optional, defaults to 0)",
},
end_time: {
type: "number",
description: "The end time of the clip in seconds (optional, defaults to end of video)",
},
},
required: ["path"],
},
},
},
];
function videoBlock(videoPath) {
const suffix = path.extname(videoPath).slice(1) || "mp4";
const videoBase64 = fs.readFileSync(videoPath).toString("base64");
return { type: "video_url", video_url: { url: `data:video/${suffix};base64,${videoBase64}` } };
}
function watchVideoClip(videoPath, startTime = null, endTime = null) {
const resolvedPath = path.resolve(videoPath.replace(/^~(?=$|\/|\\)/, os.homedir()));
if (!fs.existsSync(resolvedPath)) {
throw new Error(`Video file not found: ${resolvedPath}`);
}
if (startTime === null && endTime === null) {
return [videoBlock(resolvedPath), { type: "text", text: `Full video: ${path.basename(resolvedPath)}` }];
}
const probe = execFileSync("ffprobe", ["-v", "quiet", "-print_format", "json", "-show_format", resolvedPath], {
encoding: "utf8",
});
const duration = Number(JSON.parse(probe).format.duration);
const start = startTime ?? 0;
const end = endTime ?? duration;
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "kimi-video-"));
const tempPath = path.join(tempDir, `clip${path.extname(resolvedPath) || ".mp4"}`);
try {
execFileSync("ffmpeg", [
"-y",
"-ss",
String(start),
"-i",
resolvedPath,
"-t",
String(end - start),
"-c:v",
"libx264",
"-c:a",
"aac",
"-preset",
"fast",
"-crf",
"23",
"-movflags",
"+faststart",
"-loglevel",
"error",
tempPath,
]);
return [
videoBlock(tempPath),
{ type: "text", text: `Clip from ${path.basename(resolvedPath)}: ${start}s - ${end}s` },
];
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
}
async function agentLoop(userMessage) {
const messages = [
{
role: "system",
content: "You are a video analysis assistant. Use watch_video_clip to examine specific portions of videos.",
},
{ role: "user", content: userMessage },
];
while (true) {
const response = await client.chat.completions.create({
model: MODEL,
messages,
tools,
tool_choice: "auto",
});
const message = response.choices[0].message;
messages.push(message);
if (!message.tool_calls?.length) {
return message.content || JSON.stringify(response, null, 2);
}
for (const toolCall of message.tool_calls) {
if (toolCall.function.name !== "watch_video_clip") continue;
const args = JSON.parse(toolCall.function.arguments || "{}");
messages.push({
role: "tool",
tool_call_id: toolCall.id,
content: watchVideoClip(args.path, args.start_time, args.end_time),
});
}
}
}
if (!fs.existsSync(path.resolve(VIDEO_PATH.replace(/^~(?=$|\/|\\)/, os.homedir())))) {
throw new Error(`Set KIMI_VIDEO_PATH to a local video file before running this example: ${VIDEO_PATH}`);
}
console.log(await agentLoop(`Analyze what happens between seconds 8-13 in ${VIDEO_PATH}`));
#!/bin/bash
# Get your CometAPI key from https://www.cometapi.com/console/token
# Export it as: export COMETAPI_KEY="your-key-here"
# Optionally set a local video path: export KIMI_VIDEO_PATH="$HOME/Downloads/test_video.mp4"
VIDEO_PATH="${KIMI_VIDEO_PATH:-$HOME/Downloads/test_video.mp4}"
curl -s https://api.cometapi.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $COMETAPI_KEY" \
-d "{
\"model\": \"kimi-k2.7-code\",
\"messages\": [
{
\"role\": \"system\",
\"content\": \"You are a video analysis assistant. Use watch_video_clip to examine specific portions of videos.\"
},
{
\"role\": \"user\",
\"content\": \"Analyze what happens between seconds 8-13 in ${VIDEO_PATH}\"
}
],
\"tools\": [
{
\"type\": \"function\",
\"function\": {
\"name\": \"watch_video_clip\",
\"description\": \"Watch a video file or a sub-clip of it. If start_time and end_time are not provided, the entire video will be returned.\",
\"parameters\": {
\"type\": \"object\",
\"properties\": {
\"path\": {
\"type\": \"string\",
\"description\": \"The path to the video file to watch\"
},
\"start_time\": {
\"type\": \"number\",
\"description\": \"The start time of the clip in seconds (optional, defaults to 0)\"
},
\"end_time\": {
\"type\": \"number\",
\"description\": \"The end time of the clip in seconds (optional, defaults to end of video)\"
}
},
\"required\": [\"path\"]
}
}
}
],
\"tool_choice\": \"auto\"
}"
| version |
|---|
| kimi-k2.7-code |