DaVinci MCP Professional v2.1.1
A modern, professional Model Context Protocol server for DaVinci Resolve integration
Loading...
Searching...
No Matches
resolve_client.py
Go to the documentation of this file.
1"""
2DaVinci Resolve client wrapper.
3
4Provides a clean interface to the DaVinci Resolve API with proper error handling
5and logging.
6"""
7
8import logging
9from typing import Any, Dict, List, Optional, Union
10
11from .utils.platform import setup_resolve_environment, check_resolve_running
12
13
14logger = logging.getLogger(__name__)
15
16
17class DaVinciResolveError(Exception):
18 """Base exception for DaVinci Resolve related errors."""
19 pass
20
21
23 """Raised when DaVinci Resolve is not running."""
24 pass
25
26
28 """Raised when connection to DaVinci Resolve fails."""
29 pass
30
31
33 """
34 A clean interface to the DaVinci Resolve API.
35
36 This class handles the connection to DaVinci Resolve and provides
37 organized methods for interacting with projects, timelines, media, etc.
38 """
39
40 def __init__(self) -> None:
41 self._resolve: Optional[Any] = None
42 self._project_manager: Optional[Any] = None
43 self._current_project: Optional[Any] = None
44 self._is_connected = False
45
46 def connect(self) -> None:
47 """Connect to DaVinci Resolve."""
48 # Check if Resolve is running
49 if not check_resolve_running():
51 "DaVinci Resolve is not running. Please start DaVinci Resolve first."
52 )
53
54 # Set up environment
55 if not setup_resolve_environment():
57 "Failed to set up DaVinci Resolve environment variables."
58 )
59
60 try:
61 # Import and connect to Resolve
62 import DaVinciResolveScript as dvr_script
63 self._resolve = dvr_script.scriptapp("Resolve")
64
65 if self._resolve is None:
67 "Failed to get Resolve object. Check that DaVinci Resolve is running."
68 )
69
70 # Get project manager
71 self._project_manager = self._resolve.GetProjectManager()
72 if self._project_manager is None:
73 raise DaVinciResolveConnectionError("Failed to get Project Manager.")
74
75 # Get current project if one is open
76 self._current_project = self._project_manager.GetCurrentProject()
77
78 self._is_connected = True
79 logger.info(f"Connected to {self.get_version()}")
80
81 except ImportError as e:
83 f"Failed to import DaVinciResolveScript: {e}. "
84 "Check environment variables and DaVinci Resolve installation."
85 )
86 except Exception as e:
87 raise DaVinciResolveConnectionError(f"Unexpected error connecting: {e}")
88
89 def disconnect(self) -> None:
90 """Disconnect from DaVinci Resolve."""
91 self._resolve = None
92 self._project_manager = None
93 self._current_project = None
94 self._is_connected = False
95 logger.info("Disconnected from DaVinci Resolve")
96
97 def is_connected(self) -> bool:
98 """Check if connected to DaVinci Resolve."""
99 return self._is_connected
100
101 def _ensure_connected(self) -> None:
102 """Ensure we're connected to Resolve."""
103 if not self._is_connected:
104 raise DaVinciResolveConnectionError("Not connected to DaVinci Resolve")
105
106 def _ensure_project(self) -> Any:
107 """Ensure we have a current project."""
108 self._ensure_connected()
109
110 # Refresh current project
111 if self._project_manager:
112 self._current_project = self._project_manager.GetCurrentProject()
113
114 if self._current_project is None:
115 raise DaVinciResolveError("No project is currently open")
116
117 return self._current_project
118
119 # System Information
120 def get_version(self) -> str:
121 """Get DaVinci Resolve version."""
122 self._ensure_connected()
123 if self._resolve:
124 return f"{self._resolve.GetProductName()} {self._resolve.GetVersionString()}"
125 return "Unknown"
126
127 def get_current_page(self) -> str:
128 """Get the current page (Edit, Color, Fusion, etc.)."""
129 self._ensure_connected()
130 if self._resolve:
131 return self._resolve.GetCurrentPage()
132 return "Unknown"
133
134 def switch_page(self, page: str) -> bool:
135 """Switch to a specific page."""
136 self._ensure_connected()
137
138 valid_pages = ['media', 'cut', 'edit', 'fusion', 'color', 'fairlight', 'deliver']
139 if page.lower() not in valid_pages:
140 raise ValueError(f"Invalid page. Must be one of: {', '.join(valid_pages)}")
141
142 if self._resolve:
143 return bool(self._resolve.OpenPage(page.lower()))
144 return False
145
146 # Project Management
147 def list_projects(self) -> List[str]:
148 """List all projects in the current database."""
149 self._ensure_connected()
150
151 if self._project_manager:
152 projects = self._project_manager.GetProjectListInCurrentFolder()
153 return [p for p in projects if p] # Filter out empty strings
154 return []
155
156 def get_current_project_name(self) -> Optional[str]:
157 """Get the name of the currently open project."""
158 try:
159 project = self._ensure_project()
160 return project.GetName()
161 except DaVinciResolveError:
162 return None
163
164 def open_project(self, name: str) -> bool:
165 """Open a project by name."""
166 self._ensure_connected()
167
168 if not self._project_manager:
169 return False
170
171 # Check if project exists
172 projects = self.list_projects()
173 if name not in projects:
174 raise ValueError(f"Project '{name}' not found. Available: {', '.join(projects)}")
175
176 result = self._project_manager.LoadProject(name)
177 if result:
178 self._current_project = self._project_manager.GetCurrentProject()
179 logger.info(f"Opened project: {name}")
180
181 return bool(result)
182
183 def create_project(self, name: str) -> bool:
184 """Create a new project."""
185 self._ensure_connected()
186
187 if not self._project_manager:
188 return False
189
190 # Check if project already exists
191 projects = self.list_projects()
192 if name in projects:
193 raise ValueError(f"Project '{name}' already exists")
194
195 result = self._project_manager.CreateProject(name)
196 if result:
197 self._current_project = self._project_manager.GetCurrentProject()
198 logger.info(f"Created project: {name}")
199
200 return bool(result)
201
202 # Timeline Management
203 def list_timelines(self) -> List[str]:
204 """List all timelines in the current project."""
205 project = self._ensure_project()
206
207 timeline_count = project.GetTimelineCount()
208 timelines = []
209
210 for i in range(1, timeline_count + 1):
211 timeline = project.GetTimelineByIndex(i)
212 if timeline:
213 timelines.append(timeline.GetName())
214
215 return timelines
216
217 def get_current_timeline_name(self) -> Optional[str]:
218 """Get the name of the current timeline."""
219 try:
220 project = self._ensure_project()
221 current_timeline = project.GetCurrentTimeline()
222 return current_timeline.GetName() if current_timeline else None
223 except DaVinciResolveError:
224 return None
225
226 def create_timeline(self, name: str) -> bool:
227 """Create a new timeline."""
228 project = self._ensure_project()
229
230 media_pool = project.GetMediaPool()
231 if not media_pool:
232 raise DaVinciResolveError("Failed to get Media Pool")
233
234 timeline = media_pool.CreateEmptyTimeline(name)
235 if timeline:
236 logger.info(f"Created timeline: {name}")
237 return True
238
239 return False
240
241 def switch_timeline(self, name: str) -> bool:
242 """Switch to a timeline by name."""
243 project = self._ensure_project()
244
245 # Find timeline by name
246 timeline_count = project.GetTimelineCount()
247 for i in range(1, timeline_count + 1):
248 timeline = project.GetTimelineByIndex(i)
249 if timeline and timeline.GetName() == name:
250 result = project.SetCurrentTimeline(timeline)
251 if result:
252 logger.info(f"Switched to timeline: {name}")
253 return bool(result)
254
255 raise ValueError(f"Timeline '{name}' not found")
256
257 # Media Pool Management
258 def list_media_clips(self) -> List[Dict[str, Any]]:
259 """List all clips in the media pool root folder."""
260 project = self._ensure_project()
261
262 media_pool = project.GetMediaPool()
263 if not media_pool:
264 raise DaVinciResolveError("Failed to get Media Pool")
265
266 root_folder = media_pool.GetRootFolder()
267 if not root_folder:
268 raise DaVinciResolveError("Failed to get root folder")
269
270 clips = root_folder.GetClipList()
271 if not clips:
272 return []
273
274 result = []
275 for clip in clips:
276 result.append({
277 "name": clip.GetName(),
278 "duration": clip.GetDuration(),
279 "fps": clip.GetClipProperty("FPS") or "Unknown"
280 })
281
282 return result
283
284 def import_media(self, file_path: str) -> bool:
285 """Import a media file into the media pool."""
286 project = self._ensure_project()
287
288 media_pool = project.GetMediaPool()
289 if not media_pool:
290 raise DaVinciResolveError("Failed to get Media Pool")
291
292 # Import the media file
293 imported_clips = media_pool.ImportMedia([file_path])
294
295 if imported_clips:
296 logger.info(f"Imported media: {file_path}")
297 return True
298
299 return False