03 — Python Deep Dive for Interviews

Priority: HIGH — Python is your primary language. Expect deep questions. They’ll test internals, concurrency, frameworks, and production patterns.


Table of Contents

  1. Python Internals
  2. Concurrency & Parallelism
  3. Advanced Language Features
  4. FastAPI Deep Dive
  5. Django Deep Dive
  6. Testing
  7. Performance & Profiling
  8. Common Interview Questions
  9. Resources

Python Internals

The GIL (Global Interpreter Lock)

What: A mutex in CPython that allows only ONE thread to execute Python
bytecode at a time.

Why it exists:
  - CPython's memory management (reference counting) is not thread-safe
  - The GIL simplifies the implementation of CPython
  - Without it, every object would need its own lock

Impact:
  - CPU-bound threads: GIL is a bottleneck (threads are effectively serial)
  - I/O-bound threads: GIL is released during I/O operations → concurrency ok
  - Multi-processing: each process has its own GIL → true parallelism

When asked "How do you handle CPU-bound tasks?":
  1. multiprocessing module (separate processes)
  2. concurrent.futures.ProcessPoolExecutor
  3. C extensions that release the GIL (NumPy, etc.)
  4. Dask for distributed computation (you used this!)
  5. Rust extensions via PyO3/maturin (your Rust knowledge is valuable here!)

When asked "How do you handle I/O-bound tasks?":
  1. asyncio (event loop, non-blocking I/O)
  2. threading module
  3. concurrent.futures.ThreadPoolExecutor
  4. aiohttp, httpx for async HTTP

Memory Management

Reference Counting:
  - Every object has a reference count
  - When count drops to 0, object is freed
  - sys.getrefcount(obj) to check (count is always +1 from the call itself)

Garbage Collection (for cycles):
  - Reference counting can't handle circular references
  - Python's gc module uses generational garbage collection
  - 3 generations: 0 (young), 1, 2 (old)
  - Objects that survive collections move to older generations

Memory Optimization:
  - __slots__: prevents __dict__ per instance, saves memory
  - Generators: lazy evaluation, O(1) memory for iteration
  - interning: Python caches small integers (-5 to 256) and short strings
  - weakref: references that don't prevent garbage collection

Object Model

Everything is an object:
  - Functions, classes, modules — all objects
  - type(42) → <class 'int'>
  - type(int) → <class 'type'>
  - type(type) → <class 'type'> (metaclass)

Mutable vs Immutable:
  Immutable: int, float, str, tuple, frozenset, bytes
  Mutable: list, dict, set, bytearray, custom objects

  Why it matters:
  - Immutable objects can be dictionary keys and set members
  - Immutable doesn't mean "can't change variable" — it means the object itself
  - tuple of lists: tuple is immutable, but contained lists are mutable!

Copy semantics:
  - Assignment: x = y → both point to same object
  - Shallow copy: copy.copy() or list.copy() → new container, same elements
  - Deep copy: copy.deepcopy() → new container + new copies of all elements

Dunder methods (Magic methods):
  __init__: constructor
  __repr__: developer-facing string (unambiguous)
  __str__: user-facing string
  __eq__, __hash__: equality and hashing
  __lt__, __gt__: comparison (for sorting)
  __enter__, __exit__: context manager protocol
  __iter__, __next__: iterator protocol
  __getitem__, __setitem__: indexing
  __call__: make instance callable
  __len__: len() support

Concurrency & Parallelism

asyncio (Most Important for Your Profile)

# --- Basic async/await ---
import asyncio

async def fetch_data(url: str) -> dict:
    """async functions return coroutines, not values."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def main():
    # Run multiple coroutines concurrently
    results = await asyncio.gather(
        fetch_data("https://api.example.com/1"),
        fetch_data("https://api.example.com/2"),
        fetch_data("https://api.example.com/3"),
    )
    return results

asyncio.run(main())
Key Concepts:

Event Loop:
  - Single-threaded scheduler that manages coroutines
  - Runs coroutines, handles I/O, fires callbacks
  - Only ONE coroutine runs at a time (cooperative multitasking)
  - Coroutines "yield control" at await points

Coroutine vs Task vs Future:
  - Coroutine: async def function (not yet running)
  - Task: a scheduled coroutine (created by asyncio.create_task())
  - Future: placeholder for an eventual result

asyncio.gather vs asyncio.wait:
  - gather: run awaitables concurrently, return ordered results
  - wait: more control (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED)

asyncio.create_task:
  - Schedules coroutine to run concurrently
  - Must hold reference to task (or it may be garbage collected!)

Common patterns:
  # Semaphore to limit concurrency
  sem = asyncio.Semaphore(10)
  async def limited_fetch(url):
      async with sem:
          return await fetch(url)

  # Timeout
  try:
      result = await asyncio.wait_for(coroutine, timeout=5.0)
  except asyncio.TimeoutError:
      print("Timed out")

  # Queue for producer/consumer
  queue = asyncio.Queue()
  async def producer():
      await queue.put(item)
  async def consumer():
      item = await queue.get()

Threading vs Multiprocessing vs Asyncio

| Feature           | threading        | multiprocessing  | asyncio          |
|-------------------|------------------|------------------|------------------|
| Concurrency model | Preemptive       | True parallel    | Cooperative      |
| GIL impact        | Limited by GIL   | Bypasses GIL     | N/A (single thread) |
| Best for          | I/O-bound        | CPU-bound        | I/O-bound (many) |
| Memory sharing    | Shared memory    | Separate memory  | Shared memory    |
| Overhead          | Moderate         | High (processes) | Low              |
| Complexity        | Race conditions  | IPC complexity   | async/await      |
| Scaling           | ~100s threads    | ~10s processes   | ~10,000s tasks   |

Your answer: "At Intensel, I use asyncio with FastAPI for handling
hundreds of concurrent API requests efficiently. For CPU-intensive
geospatial processing, I use Dask which distributes work across
multiple processes/machines, effectively bypassing the GIL."

concurrent.futures

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Thread pool (I/O-bound)
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_url, url) for url in urls]
    results = [f.result() for f in futures]

# Process pool (CPU-bound)
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_heavy_task, data_chunks))

Advanced Language Features

Decorators

# Function decorator
import functools

def retry(max_attempts=3, delay=1):
    """Decorator with parameters."""
    def decorator(func):
        @functools.wraps(func)  # preserves function metadata
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    time.sleep(delay * (2 ** attempt))  # exponential backoff
        return wrapper
    return decorator

@retry(max_attempts=3, delay=1)
def call_external_api():
    ...

# Class decorator
def singleton(cls):
    instances = {}
    @functools.wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return get_instance

Generators & Iterators

# Generator function (lazy evaluation)
def read_large_file(path):
    """Reads file line by line — O(1) memory regardless of file size."""
    with open(path) as f:
        for line in f:
            yield line.strip()

# Generator expression
squares = (x*x for x in range(1_000_000))  # no memory allocation for all values

# Iterator protocol
class Countdown:
    def __init__(self, start):
        self.current = start

    def __iter__(self):
        return self

    def __next__(self):
        if self.current <= 0:
            raise StopIteration
        self.current -= 1
        return self.current + 1

# yield from (delegation)
def chain(*iterables):
    for it in iterables:
        yield from it

Context Managers

# Using __enter__ and __exit__
class DatabaseConnection:
    def __enter__(self):
        self.conn = create_connection()
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.conn.close()
        return False  # don't suppress exceptions

# Using contextlib
from contextlib import contextmanager

@contextmanager
def timer(label):
    start = time.time()
    yield
    elapsed = time.time() - start
    print(f"{label}: {elapsed:.3f}s")

with timer("query"):
    result = db.execute(query)

# Async context manager
class AsyncDBPool:
    async def __aenter__(self):
        self.pool = await create_pool()
        return self.pool

    async def __aexit__(self, *args):
        await self.pool.close()

Metaclasses & Descriptors (Advanced)

# Metaclass: class of a class
class SingletonMeta(type):
    _instances = {}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

class Database(metaclass=SingletonMeta):
    pass

# Descriptor protocol (__get__, __set__, __delete__)
class Validated:
    def __init__(self, min_val=None, max_val=None):
        self.min_val = min_val
        self.max_val = max_val

    def __set_name__(self, owner, name):
        self.name = name

    def __set__(self, obj, value):
        if self.min_val is not None and value < self.min_val:
            raise ValueError(f"{self.name} must be >= {self.min_val}")
        obj.__dict__[self.name] = value

    def __get__(self, obj, objtype=None):
        return obj.__dict__.get(self.name)

Type Hints (Python 3.9+)

from typing import Optional, Union, TypeVar, Generic, Protocol

# Basic
def greet(name: str) -> str: ...
def process(items: list[int]) -> dict[str, int]: ...

# Optional (value or None)
def find(id: int) -> Optional[User]: ...  # same as User | None

# Union
def parse(data: str | bytes) -> dict: ...

# TypeVar (generics)
T = TypeVar("T")
def first(items: list[T]) -> T:
    return items[0]

# Protocol (structural subtyping — duck typing with types)
class Renderable(Protocol):
    def render(self) -> str: ...

def display(item: Renderable) -> None:
    print(item.render())
# Any class with a render() method works — no inheritance needed

Dataclasses & Pydantic

from dataclasses import dataclass, field
from pydantic import BaseModel, Field, validator

# Dataclass (stdlib)
@dataclass
class Point:
    x: float
    y: float
    label: str = "origin"

    def distance(self) -> float:
        return (self.x**2 + self.y**2) ** 0.5

# Pydantic model (validation + serialization)
class UserCreate(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    email: str
    age: int = Field(ge=0, le=150)

    @validator("email")
    def validate_email(cls, v):
        if "@" not in v:
            raise ValueError("Invalid email")
        return v.lower()

# Dataclass vs Pydantic:
# - Dataclass: lightweight, stdlib, no validation
# - Pydantic: validation, serialization, FastAPI integration

FastAPI Deep Dive

Core Concepts

from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# --- Dependency Injection ---
async def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(User).get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# --- Background Tasks ---
@app.post("/send-email")
async def send_email(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email_async, email)
    return {"message": "Email queued"}

# --- Middleware ---
@app.middleware("http")
async def add_timing_header(request, call_next):
    start = time.time()
    response = await call_next(request)
    response.headers["X-Process-Time"] = str(time.time() - start)
    return response

# --- Query parameters with validation ---
@app.get("/items")
async def list_items(
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    q: Optional[str] = None
):
    ...

Interview Questions About FastAPI

Q: How does FastAPI achieve high performance?
A: - Built on Starlette (ASGI framework) and Uvicorn (ASGI server)
   - ASGI is async-native, unlike WSGI (used by Flask/Django)
   - Uses uvloop (fast event loop) when available
   - Pydantic V2 for fast validation (Rust-compiled core)
   - Async request handling for I/O-bound operations

Q: Explain FastAPI's dependency injection system.
A: - Dependencies are declared as function parameters with Depends()
   - Can be sync or async functions
   - Support yield (for cleanup, like DB sessions)
   - Can be nested (dependencies can have dependencies)
   - Cached per-request by default (same dependency = same instance)
   - Great for: DB sessions, auth, config, rate limiting

Q: async def vs def in FastAPI endpoints?
A: - async def: runs in the event loop, use for async I/O operations
   - def: runs in a thread pool (threadpool executor), use for sync/blocking code
   - Mixing: if your endpoint calls sync DB code, use def (FastAPI handles it)
   - If your endpoint uses async DB (asyncpg), use async def

Q: How do you handle auth in FastAPI?
A: - OAuth2PasswordBearer for token-based auth
   - JWT tokens (encode/decode with python-jose or PyJWT)
   - Dependency injection for auth middleware
   - Scopes for role-based access control

Q: How do you test FastAPI?
A: - TestClient (based on httpx) for sync tests
   - AsyncClient (httpx) for async tests
   - Override dependencies with app.dependency_overrides
   - pytest fixtures for test database setup

Django Deep Dive

Key Concepts for Interviews

ORM:
  - QuerySets are lazy (not evaluated until iterated)
  - select_related: JOIN (foreign key, one-to-one — single query)
  - prefetch_related: separate query (many-to-many, reverse FK)
  - .only() / .defer(): partial field loading
  - .values() / .values_list(): return dicts/tuples instead of objects
  - Q objects for complex queries: Q(name="a") | Q(name="b")
  - F expressions for database-level operations: F('price') * 1.1
  - Aggregation: .aggregate(avg=Avg('price'))
  - Annotation: .annotate(total=Sum('orderitem__quantity'))

Migrations:
  - makemigrations: generate migration files from model changes
  - migrate: apply migrations to database
  - Custom migrations: RunPython for data migrations
  - Squashing: combine multiple migrations into one

Signals:
  - pre_save, post_save, pre_delete, post_delete
  - Use sparingly — hard to debug, implicit coupling
  - Better alternative: explicit service layer methods

Middleware:
  - Process request → view → response pipeline
  - SecurityMiddleware, SessionMiddleware, CsrfViewMiddleware
  - Custom middleware for logging, auth, rate limiting

Caching:
  - Per-view caching: @cache_page(60 * 15)
  - Template fragment caching: {% cache 300 sidebar %}
  - Low-level cache: cache.get(), cache.set()
  - Backends: Redis, Memcached, database, file, local memory

Testing

# --- pytest basics ---
import pytest

def test_add():
    assert add(2, 3) == 5

# Parametrize
@pytest.mark.parametrize("input,expected", [
    ("hello", 5),
    ("", 0),
    ("a", 1),
])
def test_string_length(input, expected):
    assert len(input) == expected

# Fixtures
@pytest.fixture
def db_session():
    session = create_test_session()
    yield session
    session.rollback()
    session.close()

def test_create_user(db_session):
    user = create_user(db_session, name="Karan")
    assert user.id is not None

# Mocking
from unittest.mock import patch, MagicMock

@patch("mymodule.external_api_call")
def test_process_data(mock_api):
    mock_api.return_value = {"status": "ok"}
    result = process_data()
    assert result == expected
    mock_api.assert_called_once()

# Async testing
@pytest.mark.asyncio
async def test_async_fetch():
    result = await fetch_data("test-url")
    assert result is not None

# FastAPI testing
from fastapi.testclient import TestClient

def test_read_items():
    client = TestClient(app)
    response = client.get("/items")
    assert response.status_code == 200

Performance & Profiling

# --- Timing ---
import time

start = time.perf_counter()
# ... code ...
elapsed = time.perf_counter() - start

# --- Profiling ---
import cProfile
cProfile.run('my_function()')

# Line profiler (pip install line-profiler)
@profile
def slow_function():
    ...

# Memory profiler (pip install memory-profiler)
@profile
def memory_heavy():
    ...

# --- Common Optimizations ---
# 1. Use sets for membership testing
if item in large_set:    # O(1) ✓
if item in large_list:   # O(n) ✗

# 2. Use generators for large data
sum(x*x for x in range(1_000_000))  # one item in memory at a time

# 3. Avoid string concatenation in loops
parts = []
for item in items:
    parts.append(str(item))
result = ''.join(parts)  # O(n) instead of O(n²)

# 4. Use built-in functions (implemented in C)
sum(), min(), max(), sorted(), any(), all()

# 5. Local variable lookup is faster than global
def process():
    local_func = some_module.function  # cache the lookup
    for item in items:
        local_func(item)

# 6. __slots__ for memory-efficient classes
class Point:
    __slots__ = ('x', 'y')
    def __init__(self, x, y):
        self.x = x
        self.y = y

Common Interview Questions

Core Python

Q: What's the difference between is and ==?
A: `is` checks identity (same object in memory), `==` checks equality (same value).
   a = [1, 2]; b = [1, 2]
   a == b → True; a is b → False

Q: How are Python dictionaries implemented?
A: Hash tables. Keys are hashed → mapped to slots in an array.
   Collisions resolved with open addressing (probing).
   Average O(1) lookup/insert, worst case O(n).
   Since Python 3.7, dicts maintain insertion order.

Q: Explain Python decorators.
A: A decorator is a function that takes a function and returns a modified function.
   @decorator syntax is sugar for: func = decorator(func)
   Uses: logging, caching, auth, retry logic, timing.

Q: What are Python generators and when would you use them?
A: Functions that yield values lazily using `yield`.
   Use when: processing large datasets (file lines, DB rows),
   infinite sequences, pipeline processing.
   Memory efficient: O(1) vs O(n) for storing all results.

Q: Explain Python's MRO (Method Resolution Order).
A: C3 linearization algorithm. For class C(A, B):
   Looks in C → A → B → object order.
   Use ClassName.__mro__ to inspect.
   super() follows MRO, not just the parent class.

Q: What is a closure?
A: A function that captures variables from its enclosing scope.
   The captured variables persist even after the outer function returns.
   Used in: decorators, callbacks, factory functions.

Q: Explain *args and **kwargs.
A: *args: variable positional arguments (tuple)
   **kwargs: variable keyword arguments (dict)
   Used for flexible function signatures, decorator wrappers.

Q: What is the walrus operator (:=)?
A: Assignment expression (Python 3.8+). Assigns and returns a value.
   while (line := f.readline()): process(line)
   if (n := len(data)) > 10: print(f"Large: {n}")

Production Python

Q: How would you handle 100K+ concurrent requests in Python?
A: Use async framework (FastAPI + uvicorn with multiple workers).
   asyncio for I/O-bound work, connection pooling for DB,
   Redis for caching, background tasks for heavy processing,
   horizontal scaling with load balancer.

Q: How do you debug memory leaks in Python?
A: tracemalloc (stdlib), objgraph, memory_profiler.
   Check for: circular references, global caches growing unbounded,
   __del__ preventing GC, file handles not closed.

Q: How do you handle database connection pooling?
A: SQLAlchemy's connection pool (pool_size, max_overflow).
   For FastAPI async: asyncpg pool or encode/databases.
   PgBouncer for external pooling.
   Key: don't create new connections per request.

Resources


My Notes

Python concepts I'm solid on:
-

Concepts I need to review:
-

Code patterns I use daily but can't explain well:
-

Next: 04-databases-and-sql.md