86 lines
2.4 KiB
Python
86 lines
2.4 KiB
Python
import psycopg2
|
|
import os
|
|
|
|
class Postgres:
|
|
def __init__(self, ip, port, username, password, database):
|
|
self.db_ip = ip
|
|
self.db_port = port
|
|
self.db_username = username
|
|
self.db_password = password
|
|
self.db_name = database
|
|
|
|
self.conn = psycopg2.connect(
|
|
host=self.db_ip,
|
|
port=self.db_port,
|
|
user=self.db_username,
|
|
password=self.db_password,
|
|
database=self.db_name
|
|
)
|
|
self.tables = []
|
|
self.tableschema = {}
|
|
self.get_schema()
|
|
|
|
def __str__(self):
|
|
return f'PostgreSQL Server: {self.db_ip}:{self.db_port} as {self.db_username} on database {self.db_name}'
|
|
|
|
def get_schema(self) -> dict:
|
|
fetch = self.fetchall("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';")
|
|
for item in fetch:
|
|
self.tables.append(item[0])
|
|
|
|
for table in self.tables:
|
|
fetch = self.fetchall(f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table}';")
|
|
self.tableschema.update({table: fetch})
|
|
|
|
return self.tableschema
|
|
|
|
def fetchall(self, query: str):
|
|
cur = self.conn.cursor()
|
|
cur.execute(query)
|
|
result = cur.fetchall()
|
|
cur.close()
|
|
return result
|
|
|
|
def fetchone(self, query: str):
|
|
cur = self.conn.cursor()
|
|
cur.execute(query)
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
return result
|
|
|
|
def fetchmany(self, query: str, size: int):
|
|
cur = self.conn.cursor()
|
|
cur.execute(query)
|
|
result = cur.fetchmany(size)
|
|
cur.close()
|
|
return result
|
|
|
|
def execute(self, query: str):
|
|
cur = self.conn.cursor()
|
|
cur.execute(query)
|
|
self.conn.commit()
|
|
cur.close()
|
|
|
|
def executemany(self, query: str, values):
|
|
cur = self.conn.cursor()
|
|
cur.executemany(query, values)
|
|
self.conn.commit()
|
|
cur.close()
|
|
|
|
def test_connection(self) -> bool:
|
|
try:
|
|
self.conn = psycopg2.connect(
|
|
host=self.db_ip,
|
|
port=self.db_port,
|
|
user=self.db_username,
|
|
password=self.db_password,
|
|
database=self.db_name
|
|
)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def close(self):
|
|
self.conn.close()
|
|
|