The Elementrix staging database allows you to load data from sources not directly supported by built-in connectors into a managed PostgreSQL database, then create data products from it.

Elementrix provides you with a managed PostgreSQL staging database:
Connection Details:
Host: staging-db.elementrix.io
Port: 5432
Database: staging_<your_org_id>
Username: <your_username>
Password: <provided_password>
SSL Mode: Require
Connection String:
postgresql://<username>:<password>@staging-db.elementrix.io:5432/staging_<your_org_id>?sslmode=require
Access:
Use any standard PostgreSQL tool or method to load data into the staging database:
Option A: SQL Client (psql):
# Connect to staging database
psql "postgresql://username:password@staging-db.elementrix.io:5432/staging_org123?sslmode=require"
# Create your table
CREATE TABLE customer_data (
customer_id UUID PRIMARY KEY,
name VARCHAR(200),
email VARCHAR(200),
created_at TIMESTAMP DEFAULT NOW()
);
# Load data from CSV
\COPY customer_data FROM 'customers.csv' CSV HEADER;
# Or insert directly
INSERT INTO customer_data (customer_id, name, email)
VALUES ('550e8400-e29b-41d4-a716-446655440000', 'John Doe', 'john@example.com');
Option B: GUI Tools (pgAdmin, DBeaver, DataGrip):
Option C: ETL Tools (Airflow, dbt, custom scripts):
# Python example with psycopg2
import psycopg2
import pandas as pd
# Connect to staging database
conn = psycopg2.connect(
host="staging-db.elementrix.io",
port=5432,
database="staging_org123",
user="username",
password="password",
sslmode="require"
)
# Load data from external source (e.g., MongoDB)
from pymongo import MongoClient
mongo_client = MongoClient("mongodb://mongo-host:27017")
data = list(mongo_client.production.customers.find())
# Convert to DataFrame and load to staging
df = pd.DataFrame(data)
df.to_sql('customer_data', conn, if_exists='replace', index=False)
conn.close()
Option D: Oracle to Staging:
# Extract from Oracle and load to PostgreSQL staging
import cx_Oracle
import psycopg2
# Connect to Oracle
oracle_conn = cx_Oracle.connect("user/password@oracle-host:1521/ORCL")
oracle_cursor = oracle_conn.cursor()
# Connect to PostgreSQL staging
pg_conn = psycopg2.connect(
"postgresql://username:password@staging-db.elementrix.io:5432/staging_org123?sslmode=require"
)
pg_cursor = pg_conn.cursor()
# Extract from Oracle
oracle_cursor.execute("SELECT * FROM production_table")
rows = oracle_cursor.fetchall()
# Create table in staging
pg_cursor.execute("""
CREATE TABLE IF NOT EXISTS production_table (
id BIGINT PRIMARY KEY,
name VARCHAR(200),
created_at TIMESTAMP
)
""")
# Load to staging
for row in rows:
pg_cursor.execute(
"INSERT INTO production_table VALUES (%s, %s, %s)",
row
)
pg_conn.commit()
Once your data is in the staging database, create a data product using the Elementrix staging connector:

customer_data)The process is identical to connecting to a regular PostgreSQL database.
Host: staging-db.elementrix.io
Port: 5432
Database: staging_org123
Username: org123_admin
Password: ••••••••••
Using psql (Command Line):
# Set environment variable for password (optional)
export PGPASSWORD='your_password'
# Connect
psql -h staging-db.elementrix.io -p 5432 -U org123_admin -d staging_org123
# Test connection
\dt # List tables
\l # List databases
Using pgAdmin (GUI):
Using Python:
import psycopg2
# Connection parameters
conn_params = {
"host": "staging-db.elementrix.io",
"port": 5432,
"database": "staging_org123",
"user": "org123_admin",
"password": "your_password",
"sslmode": "require"
}
# Connect
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
# Test query
cursor.execute("SELECT version();")
print(cursor.fetchone())
cursor.close()
conn.close()
Example 1: Load from API:
import requests
import psycopg2
import json
# Fetch data from API
response = requests.get("https://api.example.com/customers")
customers = response.json()
# Connect to staging
conn = psycopg2.connect(
"postgresql://org123_admin:password@staging-db.elementrix.io:5432/staging_org123?sslmode=require"
)
cursor = conn.cursor()
# Create table
cursor.execute("""
CREATE TABLE IF NOT EXISTS api_customers (
id UUID PRIMARY KEY,
name VARCHAR(200),
email VARCHAR(200),
signup_date TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
)
""")
# Insert data
for customer in customers:
cursor.execute("""
INSERT INTO api_customers (id, name, email, signup_date)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
signup_date = EXCLUDED.signup_date
""", (
customer['id'],
customer['name'],
customer['email'],
customer['signup_date']
))
conn.commit()
cursor.close()
conn.close()
Example 2: Load from CSV Files:
# Using psql COPY command
psql -h staging-db.elementrix.io -U org123_admin -d staging_org123 << EOF
CREATE TABLE IF NOT EXISTS sales_data (
sale_id BIGINT PRIMARY KEY,
product_id INT,
quantity INT,
amount DECIMAL(10,2),
sale_date DATE
);
\COPY sales_data FROM '/path/to/sales.csv' CSV HEADER;
EOF
Example 3: Load from MongoDB:
from pymongo import MongoClient
import psycopg2
from datetime import datetime
# Connect to MongoDB
mongo_client = MongoClient("mongodb://mongo-host:27017")
mongo_db = mongo_client.production
orders = mongo_db.orders.find()
# Connect to staging
pg_conn = psycopg2.connect(
"postgresql://org123_admin:password@staging-db.elementrix.io:5432/staging_org123?sslmode=require"
)
pg_cursor = pg_conn.cursor()
# Create table
pg_cursor.execute("""
CREATE TABLE IF NOT EXISTS mongo_orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50),
total_amount DECIMAL(10,2),
status VARCHAR(50),
order_date TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
)
""")
# Load data
for order in orders:
pg_cursor.execute("""
INSERT INTO mongo_orders (order_id, customer_id, total_amount, status, order_date)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (order_id) DO UPDATE SET
customer_id = EXCLUDED.customer_id,
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
order_date = EXCLUDED.order_date
""", (
str(order['_id']),
order.get('customer_id'),
order.get('total'),
order.get('status'),
order.get('order_date')
))
pg_conn.commit()
pg_cursor.close()
pg_conn.close()
Database Type: PostgreSQL
Connection: Elementrix Staging Database (pre-configured)
Available Tables:
☑ api_customers
☐ sales_data
☐ mongo_orders
Scenario: Migrate Oracle production data to Elementrix
Process:
1. Use Oracle client or ETL tool to extract data
2. Transform to PostgreSQL-compatible format
3. Load into staging database
4. Create data product for consumption
Scenario: Expose MongoDB collections as structured data products
Process:
1. Extract documents from MongoDB
2. Flatten and normalize into relational tables
3. Load into staging database
4. Create data products with proper schema
Scenario: Regular sync from third-party API
Process:
1. Schedule script (cron, Airflow) to fetch API data
2. Transform JSON to tabular format
3. Upsert into staging database
4. Data product automatically stays in sync
Scenario: Process daily CSV/Parquet exports
Process:
1. Automated job picks up new files
2. Load files into staging tables
3. Staging data product provides clean interface
4. Downstream systems consume via Elementrix API
Scenario: Expose mainframe or legacy system data
Process:
1. Extract data using legacy tools/scripts
2. Convert to modern format
3. Stage in PostgreSQL
4. Data product provides modern API access
-- Good: Normalized structure
CREATE TABLE customers (
customer_id UUID PRIMARY KEY,
name VARCHAR(200),
email VARCHAR(200),
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
order_id UUID PRIMARY KEY,
customer_id UUID REFERENCES customers(customer_id),
order_date TIMESTAMP,
total_amount DECIMAL(10,2)
);
-- Avoid: Denormalized structure
CREATE TABLE orders_with_customer_details (
order_id UUID PRIMARY KEY,
customer_id UUID,
customer_name VARCHAR(200), -- Duplicate data
customer_email VARCHAR(200), -- Duplicate data
order_date TIMESTAMP,
total_amount DECIMAL(10,2)
);
CREATE TABLE product_data (
id UUID PRIMARY KEY,
name VARCHAR(200),
-- Required audit columns
created_at TIMESTAMP DEFAULT NOW() NOT NULL,
updated_at TIMESTAMP DEFAULT NOW() NOT NULL,
-- Recommended audit columns
created_by VARCHAR(100),
updated_by VARCHAR(100),
deleted_at TIMESTAMP -- For soft deletes
);
-- Update trigger for updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_product_data_updated_at
BEFORE UPDATE ON product_data
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
-- Primary Keys: UUID (recommended) or BIGINT
CREATE TABLE transactions (
transaction_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
amount DECIMAL(10,2)
);
-- Or BIGINT with sequence
CREATE TABLE transactions (
transaction_id BIGSERIAL PRIMARY KEY,
amount DECIMAL(10,2)
);
-- Foreign Keys: Match referenced table type
CREATE TABLE line_items (
line_item_id UUID PRIMARY KEY,
transaction_id UUID REFERENCES transactions(transaction_id),
product_id UUID REFERENCES products(product_id)
);
-- Index on frequently queried columns
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
CREATE INDEX idx_orders_order_date ON orders(order_date);
-- Index for incremental sync (updated_at)
CREATE INDEX idx_customers_updated_at ON customers(updated_at DESC);
-- Composite index for common query patterns
CREATE INDEX idx_orders_customer_date ON orders(customer_id, order_date);
-- Add constraints to ensure data quality
CREATE TABLE validated_data (
id UUID PRIMARY KEY,
email VARCHAR(200) NOT NULL CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$'),
age INT CHECK (age >= 0 AND age <= 150),
amount DECIMAL(10,2) CHECK (amount >= 0),
status VARCHAR(20) CHECK (status IN ('pending', 'completed', 'failed'))
);
✅ No Custom Elementrix Code Required
✅ Standard PostgreSQL Tooling
✅ Full Control Over Transformations
✅ Supports Any Data Source
✅ Familiar Workflow for Data Teams
✅ Incremental Sync Support
updated_at column and index⚠️ Manual Data Loading
⚠️ Storage Limits
⚠️ Performance
⚠️ Data Freshness