Skip to content

Client

The Client class is the main entry point for interacting with the MicroDC platform.

Client

Main client for interacting with MicroDC.ai platform.

Attributes:

Name Type Description
api_key str

MicroDC API key for authentication

base_url str

Base URL for MicroDC API

timeout int

Default request timeout in seconds

verify_ssl bool

Whether to verify SSL certificates

Source code in microdc/client.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
class Client:
    """
    Main client for interacting with MicroDC.ai platform.

    Attributes:
        api_key (str): MicroDC API key for authentication
        base_url (str): Base URL for MicroDC API
        timeout (int): Default request timeout in seconds
        verify_ssl (bool): Whether to verify SSL certificates
    """

    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.microdc.ai",
        timeout: int = 30,
        verify_ssl: bool = True,
        auto_start_polling: bool = True,
    ):
        """
        Initialize MicroDC client.

        Args:
            api_key: API key starting with "mDC_"
            base_url: API endpoint URL
            timeout: Request timeout in seconds
            verify_ssl: Enable SSL certificate verification
            auto_start_polling: Automatically start polling thread
        """
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
        self.verify_ssl = verify_ssl

        # HTTP transport
        self._transport = HTTPTransport(
            api_key=api_key,
            base_url=base_url,
            timeout=timeout,
            verify_ssl=verify_ssl,
        )

        # Polling manager
        self._polling = PollingManager(poll_interval=2.0)
        self._polling.set_job_fetcher(self._fetch_job_details)

        if auto_start_polling:
            self._polling.start()

    def set_callback(self, callback: Callable[["Client", str], None]) -> None:
        """
        Set callback function to be called when jobs complete.

        Args:
            callback: Function that takes (Client, job_id) as arguments
        """

        def wrapper(job_id: str) -> None:
            callback(self, job_id)

        self._polling.set_callback(wrapper)

    def send_job(self, job: BaseCall) -> str:
        """
        Submit a job to the MicroDC platform.

        Args:
            job: Job object (LLMComplete, LLMChat, LLMEmbed, DocumentCall, etc.)

        Returns:
            str: Job ID

        Raises:
            AuthenticationError: Invalid API key
            ValidationError: Invalid job configuration
            APIError: Server error
        """
        # Validate job before submission
        job.validate()

        # All jobs submit to the unified endpoint
        endpoint = "/api/v1/jobs/submit"

        # Submit job
        payload = job.to_api_payload()
        response = self._transport.request("POST", endpoint, json=payload)

        # Extract job ID - JobResponse uses 'id' field (UUID)
        job_id = str(response.get("id") or response.get("job_id"))

        # Track job
        job._job_id = job_id
        job._submitted_at = datetime.now()
        self._polling.track_job(job_id, job)

        return job_id

    def get_job_details(self, job_id: str) -> JobDetails:
        """
        Retrieve detailed information about a job.

        Args:
            job_id: Unique job identifier

        Returns:
            JobDetails: Object containing job status, results, and metadata
        """
        # Check if we already have the completed job cached
        cached = self._polling.get_completed_job(job_id)
        if cached is not None:
            return cached

        # Fetch from API
        details = self._fetch_job_details(job_id)

        # Cache if completed
        if details.is_completed():
            self._polling.cache_completed_job(job_id, details)

        return details

    def _fetch_job_details(self, job_id: str) -> JobDetails:
        """
        Fetch job details from API (internal method).

        Args:
            job_id: Unique job identifier

        Returns:
            JobDetails: Parsed job details
        """
        response = self._transport.request("GET", f"/api/v1/jobs/{job_id}")
        return self._parse_job_details(response)

    def get_job_status(self, job_id: str) -> str:
        """
        Get current status of a job.

        Args:
            job_id: Unique job identifier

        Returns:
            str: Status (queued, processing, completed, failed, cancelled)
        """
        response = self._transport.request("GET", f"/api/v1/jobs/{job_id}/status")
        return str(response["status"])

    def cancel_job(self, job_id: str) -> bool:
        """
        Cancel a pending or processing job.

        Args:
            job_id: Unique job identifier

        Returns:
            bool: True if cancellation successful
        """
        response = self._transport.request("DELETE", f"/api/v1/jobs/{job_id}")
        return bool(response.get("cancelled", False))

    def acknowledge_job(self, job_id: str) -> Dict[str, Any]:
        """
        Acknowledge that a job has been completed and processed by the client.

        This is useful for tracking which jobs have been seen/handled by the client,
        especially when using callbacks or polling mechanisms.

        Args:
            job_id: Unique job identifier

        Returns:
            Dict[str, Any]: Acknowledgment details containing:
                - message: Success message
                - job_id: The acknowledged job ID
                - acknowledged_at: Timestamp when acknowledged (ISO format)

        Raises:
            JobNotFoundError: If job does not exist
            APIError: If acknowledgment fails

        Example:
            >>> result = client.acknowledge_job(job_id)
            >>> print(f"Acknowledged at: {result['acknowledged_at']}")
        """
        response = self._transport.request("POST", f"/api/v1/jobs/{job_id}/acknowledge")
        return response

    def list_jobs(
        self,
        status: Optional[str] = None,
        limit: int = 100,
        offset: int = 0,
    ) -> List[JobDetails]:
        """
        List jobs for the authenticated user.

        Args:
            status: Filter by status (queued, processing, completed, failed)
            limit: Maximum number of jobs to return
            offset: Pagination offset

        Returns:
            List[JobDetails]: List of job details
        """
        params: Dict[str, Any] = {"limit": limit, "offset": offset}
        if status:
            params["status"] = status

        response = self._transport.request("GET", "/api/v1/jobs", params=params)
        jobs = response.get("jobs", [])

        return [self._parse_job_details(job_data) for job_data in jobs]

    def wait_for_all(self, timeout: Optional[float] = None) -> None:
        """
        Block until all pending jobs complete.

        Args:
            timeout: Maximum seconds to wait (None = wait forever)

        Raises:
            TimeoutError: If timeout exceeded
        """
        start_time = time.time()

        while True:
            if not self._polling.has_pending_jobs():
                return

            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    raise TimeoutError(f"Timeout waiting for jobs after {timeout}s")

            time.sleep(0.1)

    def wait_for_job(self, job_id: str, timeout: Optional[float] = None) -> JobDetails:
        """
        Block until specific job completes.

        Args:
            job_id: Unique job identifier
            timeout: Maximum seconds to wait (None = wait forever)

        Returns:
            JobDetails: Completed job details

        Raises:
            TimeoutError: If timeout exceeded
        """
        start_time = time.time()

        while True:
            details = self.get_job_details(job_id)
            if details.is_completed():
                return details

            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    raise TimeoutError(f"Timeout waiting for job {job_id} after {timeout}s")

            time.sleep(2.0)

    def _parse_job_details(self, data: Dict[str, Any]) -> JobDetails:
        """
        Parse API response (JobResponse schema) into JobDetails object.

        Args:
            data: Raw API response data matching JobResponse schema

        Returns:
            JobDetails: Parsed job details
        """
        # Handle both 'job_id' and 'id' field names for compatibility
        job_id = str(data.get("id") or data.get("job_id"))

        # Parse datetime fields safely
        def parse_datetime(value: Any) -> Optional[datetime]:
            if value is None:
                return None
            if isinstance(value, datetime):
                return value
            return datetime.fromisoformat(str(value).replace("Z", "+00:00"))

        # import IPython
        # IPython.embed()

        # Parse created_at - required field
        created_at = parse_datetime(data["created_at"])
        if created_at is None:
            # Fallback to current time if parsing fails
            created_at = datetime.now()

        return JobDetails(
            job_id=job_id,
            type=data["type"],
            status=data["status"],
            model=data["model"],
            created_at=created_at,
            started_at=parse_datetime(data.get("started_at")),
            completed_at=parse_datetime(data.get("completed_at")),
            estimated_cost=data.get("estimated_cost"),
            actual_cost=data.get("actual_cost"),
            result=data.get("result"),
            error_message=data.get("error_message"),
            metadata=data.get("metadata", {}),
            priority=data.get("priority", "standard"),
            retry_count=data.get("retry_count", 0),
            user_id=str(data["user_id"]) if data.get("user_id") else None,
        )

    def upload_file(
        self,
        file_path: str,
        description: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Upload a file to MicroDC platform.

        Args:
            file_path: Path to file to upload
            description: Optional description of the file

        Returns:
            Dict containing file metadata (id, original_name, file_path, file_size)

        Raises:
            APIError: Upload failed
        """
        import os

        filename = os.path.basename(file_path)

        # Read file content
        with open(file_path, "rb") as f:
            file_content = f.read()

        # Prepare multipart form data
        # When using httpx, we pass files as tuples and any additional form fields via data
        files = {"file": (filename, file_content, "application/pdf")}

        # Add description as form data if provided
        data = {"description": description} if description else {}

        response = self._transport.request(
            "POST",
            "/api/files/upload",
            files=files,
            data=data if data else None,
        )

        return response

    def delete_file(
        self,
        file_id: str,
        permanent: bool = True,
    ) -> Dict[str, Any]:
        """
        Delete an uploaded file.

        Args:
            file_id: ID of the file to delete
            permanent: If True, permanently removes the file from disk.
                      If False, soft-deletes (can be recovered by admin).
                      Defaults to True.

        Returns:
            Dict containing confirmation message

        Example:
            >>> client.delete_file(file_id="550e8400-...")
            {'message': 'File deleted successfully'}

        Raises:
            APIError: File not found or not owned by user
        """
        params = {"permanent": permanent}
        response = self._transport.request(
            "DELETE",
            f"/api/files/{file_id}",
            params=params,
        )
        return response

    def create_download_token(
        self,
        file_id: str,
        expires_in_minutes: int = 60,
        max_uses: int = 1,
    ) -> Dict[str, Any]:
        """
        Create a one-time download token for a file.

        Args:
            file_id: ID of uploaded file
            expires_in_minutes: Token expiration time (1-10080 minutes, default 60)
            max_uses: Maximum number of uses (1-100, default 1)

        Returns:
            Dict containing token, download_url, and expires_at

        Raises:
            APIError: Token creation failed
        """
        params = {
            "expires_in_minutes": expires_in_minutes,
            "max_uses": max_uses,
        }

        response = self._transport.request(
            "POST",
            f"/api/files/{file_id}/create-download-token",
            params=params,
        )

        return response

    def upload_and_tokenize(
        self,
        file_path: str,
        description: Optional[str] = None,
        expires_in_minutes: int = 60,
        max_uses: int = 1,
    ) -> Dict[str, Any]:
        """
        Upload file and create download token in one step.

        Args:
            file_path: Path to file to upload
            description: Optional description of the file
            expires_in_minutes: Token expiration time (default 60)
            max_uses: Maximum number of uses (default 1)

        Returns:
            Dict containing file metadata and download token

        Raises:
            APIError: Upload or token creation failed
        """
        # Upload file
        file_data = self.upload_file(file_path, description)
        file_id = file_data["id"]

        # Create download token
        token_data = self.create_download_token(file_id, expires_in_minutes, max_uses)

        # Combine results
        return {
            "file_id": file_id,
            "file_metadata": file_data,
            "token": token_data["token"],
            "download_url": token_data["download_url"],
            "expires_at": token_data["expires_at"],
        }

    def close(self) -> None:
        """Clean up resources and stop polling thread."""
        self._polling.stop()
        self._transport.close()

    def __enter__(self) -> "Client":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Context manager exit."""
        self.close()

__init__(api_key, base_url='https://api.microdc.ai', timeout=30, verify_ssl=True, auto_start_polling=True)

Initialize MicroDC client.

Parameters:

Name Type Description Default
api_key str

API key starting with "mDC_"

required
base_url str

API endpoint URL

'https://api.microdc.ai'
timeout int

Request timeout in seconds

30
verify_ssl bool

Enable SSL certificate verification

True
auto_start_polling bool

Automatically start polling thread

True
Source code in microdc/client.py
def __init__(
    self,
    api_key: str,
    base_url: str = "https://api.microdc.ai",
    timeout: int = 30,
    verify_ssl: bool = True,
    auto_start_polling: bool = True,
):
    """
    Initialize MicroDC client.

    Args:
        api_key: API key starting with "mDC_"
        base_url: API endpoint URL
        timeout: Request timeout in seconds
        verify_ssl: Enable SSL certificate verification
        auto_start_polling: Automatically start polling thread
    """
    self.api_key = api_key
    self.base_url = base_url.rstrip("/")
    self.timeout = timeout
    self.verify_ssl = verify_ssl

    # HTTP transport
    self._transport = HTTPTransport(
        api_key=api_key,
        base_url=base_url,
        timeout=timeout,
        verify_ssl=verify_ssl,
    )

    # Polling manager
    self._polling = PollingManager(poll_interval=2.0)
    self._polling.set_job_fetcher(self._fetch_job_details)

    if auto_start_polling:
        self._polling.start()

send_job(job)

Submit a job to the MicroDC platform.

Parameters:

Name Type Description Default
job BaseCall

Job object (LLMComplete, LLMChat, LLMEmbed, DocumentCall, etc.)

required

Returns:

Name Type Description
str str

Job ID

Raises:

Type Description
AuthenticationError

Invalid API key

ValidationError

Invalid job configuration

APIError

Server error

Source code in microdc/client.py
def send_job(self, job: BaseCall) -> str:
    """
    Submit a job to the MicroDC platform.

    Args:
        job: Job object (LLMComplete, LLMChat, LLMEmbed, DocumentCall, etc.)

    Returns:
        str: Job ID

    Raises:
        AuthenticationError: Invalid API key
        ValidationError: Invalid job configuration
        APIError: Server error
    """
    # Validate job before submission
    job.validate()

    # All jobs submit to the unified endpoint
    endpoint = "/api/v1/jobs/submit"

    # Submit job
    payload = job.to_api_payload()
    response = self._transport.request("POST", endpoint, json=payload)

    # Extract job ID - JobResponse uses 'id' field (UUID)
    job_id = str(response.get("id") or response.get("job_id"))

    # Track job
    job._job_id = job_id
    job._submitted_at = datetime.now()
    self._polling.track_job(job_id, job)

    return job_id

get_job_details(job_id)

Retrieve detailed information about a job.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required

Returns:

Name Type Description
JobDetails JobDetails

Object containing job status, results, and metadata

Source code in microdc/client.py
def get_job_details(self, job_id: str) -> JobDetails:
    """
    Retrieve detailed information about a job.

    Args:
        job_id: Unique job identifier

    Returns:
        JobDetails: Object containing job status, results, and metadata
    """
    # Check if we already have the completed job cached
    cached = self._polling.get_completed_job(job_id)
    if cached is not None:
        return cached

    # Fetch from API
    details = self._fetch_job_details(job_id)

    # Cache if completed
    if details.is_completed():
        self._polling.cache_completed_job(job_id, details)

    return details

get_job_status(job_id)

Get current status of a job.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required

Returns:

Name Type Description
str str

Status (queued, processing, completed, failed, cancelled)

Source code in microdc/client.py
def get_job_status(self, job_id: str) -> str:
    """
    Get current status of a job.

    Args:
        job_id: Unique job identifier

    Returns:
        str: Status (queued, processing, completed, failed, cancelled)
    """
    response = self._transport.request("GET", f"/api/v1/jobs/{job_id}/status")
    return str(response["status"])

cancel_job(job_id)

Cancel a pending or processing job.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required

Returns:

Name Type Description
bool bool

True if cancellation successful

Source code in microdc/client.py
def cancel_job(self, job_id: str) -> bool:
    """
    Cancel a pending or processing job.

    Args:
        job_id: Unique job identifier

    Returns:
        bool: True if cancellation successful
    """
    response = self._transport.request("DELETE", f"/api/v1/jobs/{job_id}")
    return bool(response.get("cancelled", False))

acknowledge_job(job_id)

Acknowledge that a job has been completed and processed by the client.

This is useful for tracking which jobs have been seen/handled by the client, especially when using callbacks or polling mechanisms.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Acknowledgment details containing: - message: Success message - job_id: The acknowledged job ID - acknowledged_at: Timestamp when acknowledged (ISO format)

Raises:

Type Description
JobNotFoundError

If job does not exist

APIError

If acknowledgment fails

Example

result = client.acknowledge_job(job_id) print(f"Acknowledged at: {result['acknowledged_at']}")

Source code in microdc/client.py
def acknowledge_job(self, job_id: str) -> Dict[str, Any]:
    """
    Acknowledge that a job has been completed and processed by the client.

    This is useful for tracking which jobs have been seen/handled by the client,
    especially when using callbacks or polling mechanisms.

    Args:
        job_id: Unique job identifier

    Returns:
        Dict[str, Any]: Acknowledgment details containing:
            - message: Success message
            - job_id: The acknowledged job ID
            - acknowledged_at: Timestamp when acknowledged (ISO format)

    Raises:
        JobNotFoundError: If job does not exist
        APIError: If acknowledgment fails

    Example:
        >>> result = client.acknowledge_job(job_id)
        >>> print(f"Acknowledged at: {result['acknowledged_at']}")
    """
    response = self._transport.request("POST", f"/api/v1/jobs/{job_id}/acknowledge")
    return response

list_jobs(status=None, limit=100, offset=0)

List jobs for the authenticated user.

Parameters:

Name Type Description Default
status Optional[str]

Filter by status (queued, processing, completed, failed)

None
limit int

Maximum number of jobs to return

100
offset int

Pagination offset

0

Returns:

Type Description
List[JobDetails]

List[JobDetails]: List of job details

Source code in microdc/client.py
def list_jobs(
    self,
    status: Optional[str] = None,
    limit: int = 100,
    offset: int = 0,
) -> List[JobDetails]:
    """
    List jobs for the authenticated user.

    Args:
        status: Filter by status (queued, processing, completed, failed)
        limit: Maximum number of jobs to return
        offset: Pagination offset

    Returns:
        List[JobDetails]: List of job details
    """
    params: Dict[str, Any] = {"limit": limit, "offset": offset}
    if status:
        params["status"] = status

    response = self._transport.request("GET", "/api/v1/jobs", params=params)
    jobs = response.get("jobs", [])

    return [self._parse_job_details(job_data) for job_data in jobs]

set_callback(callback)

Set callback function to be called when jobs complete.

Parameters:

Name Type Description Default
callback Callable[[Client, str], None]

Function that takes (Client, job_id) as arguments

required
Source code in microdc/client.py
def set_callback(self, callback: Callable[["Client", str], None]) -> None:
    """
    Set callback function to be called when jobs complete.

    Args:
        callback: Function that takes (Client, job_id) as arguments
    """

    def wrapper(job_id: str) -> None:
        callback(self, job_id)

    self._polling.set_callback(wrapper)

wait_for_all(timeout=None)

Block until all pending jobs complete.

Parameters:

Name Type Description Default
timeout Optional[float]

Maximum seconds to wait (None = wait forever)

None

Raises:

Type Description
TimeoutError

If timeout exceeded

Source code in microdc/client.py
def wait_for_all(self, timeout: Optional[float] = None) -> None:
    """
    Block until all pending jobs complete.

    Args:
        timeout: Maximum seconds to wait (None = wait forever)

    Raises:
        TimeoutError: If timeout exceeded
    """
    start_time = time.time()

    while True:
        if not self._polling.has_pending_jobs():
            return

        if timeout is not None:
            elapsed = time.time() - start_time
            if elapsed >= timeout:
                raise TimeoutError(f"Timeout waiting for jobs after {timeout}s")

        time.sleep(0.1)

wait_for_job(job_id, timeout=None)

Block until specific job completes.

Parameters:

Name Type Description Default
job_id str

Unique job identifier

required
timeout Optional[float]

Maximum seconds to wait (None = wait forever)

None

Returns:

Name Type Description
JobDetails JobDetails

Completed job details

Raises:

Type Description
TimeoutError

If timeout exceeded

Source code in microdc/client.py
def wait_for_job(self, job_id: str, timeout: Optional[float] = None) -> JobDetails:
    """
    Block until specific job completes.

    Args:
        job_id: Unique job identifier
        timeout: Maximum seconds to wait (None = wait forever)

    Returns:
        JobDetails: Completed job details

    Raises:
        TimeoutError: If timeout exceeded
    """
    start_time = time.time()

    while True:
        details = self.get_job_details(job_id)
        if details.is_completed():
            return details

        if timeout is not None:
            elapsed = time.time() - start_time
            if elapsed >= timeout:
                raise TimeoutError(f"Timeout waiting for job {job_id} after {timeout}s")

        time.sleep(2.0)

upload_file(file_path, description=None)

Upload a file to MicroDC platform.

Parameters:

Name Type Description Default
file_path str

Path to file to upload

required
description Optional[str]

Optional description of the file

None

Returns:

Type Description
Dict[str, Any]

Dict containing file metadata (id, original_name, file_path, file_size)

Raises:

Type Description
APIError

Upload failed

Source code in microdc/client.py
def upload_file(
    self,
    file_path: str,
    description: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Upload a file to MicroDC platform.

    Args:
        file_path: Path to file to upload
        description: Optional description of the file

    Returns:
        Dict containing file metadata (id, original_name, file_path, file_size)

    Raises:
        APIError: Upload failed
    """
    import os

    filename = os.path.basename(file_path)

    # Read file content
    with open(file_path, "rb") as f:
        file_content = f.read()

    # Prepare multipart form data
    # When using httpx, we pass files as tuples and any additional form fields via data
    files = {"file": (filename, file_content, "application/pdf")}

    # Add description as form data if provided
    data = {"description": description} if description else {}

    response = self._transport.request(
        "POST",
        "/api/files/upload",
        files=files,
        data=data if data else None,
    )

    return response

create_download_token(file_id, expires_in_minutes=60, max_uses=1)

Create a one-time download token for a file.

Parameters:

Name Type Description Default
file_id str

ID of uploaded file

required
expires_in_minutes int

Token expiration time (1-10080 minutes, default 60)

60
max_uses int

Maximum number of uses (1-100, default 1)

1

Returns:

Type Description
Dict[str, Any]

Dict containing token, download_url, and expires_at

Raises:

Type Description
APIError

Token creation failed

Source code in microdc/client.py
def create_download_token(
    self,
    file_id: str,
    expires_in_minutes: int = 60,
    max_uses: int = 1,
) -> Dict[str, Any]:
    """
    Create a one-time download token for a file.

    Args:
        file_id: ID of uploaded file
        expires_in_minutes: Token expiration time (1-10080 minutes, default 60)
        max_uses: Maximum number of uses (1-100, default 1)

    Returns:
        Dict containing token, download_url, and expires_at

    Raises:
        APIError: Token creation failed
    """
    params = {
        "expires_in_minutes": expires_in_minutes,
        "max_uses": max_uses,
    }

    response = self._transport.request(
        "POST",
        f"/api/files/{file_id}/create-download-token",
        params=params,
    )

    return response

upload_and_tokenize(file_path, description=None, expires_in_minutes=60, max_uses=1)

Upload file and create download token in one step.

Parameters:

Name Type Description Default
file_path str

Path to file to upload

required
description Optional[str]

Optional description of the file

None
expires_in_minutes int

Token expiration time (default 60)

60
max_uses int

Maximum number of uses (default 1)

1

Returns:

Type Description
Dict[str, Any]

Dict containing file metadata and download token

Raises:

Type Description
APIError

Upload or token creation failed

Source code in microdc/client.py
def upload_and_tokenize(
    self,
    file_path: str,
    description: Optional[str] = None,
    expires_in_minutes: int = 60,
    max_uses: int = 1,
) -> Dict[str, Any]:
    """
    Upload file and create download token in one step.

    Args:
        file_path: Path to file to upload
        description: Optional description of the file
        expires_in_minutes: Token expiration time (default 60)
        max_uses: Maximum number of uses (default 1)

    Returns:
        Dict containing file metadata and download token

    Raises:
        APIError: Upload or token creation failed
    """
    # Upload file
    file_data = self.upload_file(file_path, description)
    file_id = file_data["id"]

    # Create download token
    token_data = self.create_download_token(file_id, expires_in_minutes, max_uses)

    # Combine results
    return {
        "file_id": file_id,
        "file_metadata": file_data,
        "token": token_data["token"],
        "download_url": token_data["download_url"],
        "expires_at": token_data["expires_at"],
    }

close()

Clean up resources and stop polling thread.

Source code in microdc/client.py
def close(self) -> None:
    """Clean up resources and stop polling thread."""
    self._polling.stop()
    self._transport.close()