Link to home
Start Free TrialLog in
Avatar of pvsbandi
pvsbandiFlag for United States of America

asked on

load Fixed width file into Postgres

Greetings,
    We are on Aurora Postgres 14 server.
My requirement is to schedule to load a fixed width file into the Postgres Database.
Can someone kindly outline the best ways to parse and load into the DB?
Avatar of slightwv (䄆 Netminder)
slightwv (䄆 Netminder)

I have always dealt with delimited files.  Never tried fixed witdh.

Appears the suggested method is to use the copy or \copy psql command to load the data into a staging table then use SQL to parse it out.

You can read about that method here:
http://www.postgresonline.com/article_pfriendly/157.html

That is probably good for smaller files but can see how it would be a problem if you have millions of rows in the file.

Let us know if you do and we can try and suggest alternatives but I would probably look to code.  Like Python or some compiled language like C or one of the .Net languages.
Avatar of pvsbandi

ASKER

Thank you! i read this alternative before. But my file is going to be huge, so looking for any other way out.
Any sample example using Shell scripting or Python would be great help.
I've added the Python Topic Area.

Can you define huge?  That is a pretty subjective term.

The copy command is VERY efficient at what it does and in my testing, will always out perform other options.

I'm far from a Python Expert so would have to Google around for you.  You can use the psycopg2 library to connect to a Postgres database.  Then all you need to find is the Python code to read a line and parse it.

Shortcut might be using Python or bash to quickly convert the fixed width into a csv and still use the copy command to load directly into the table.

Depending on your system, you might be able to use named pipes so one process converts the data to a csv and another uses psql and copy to load it.
Yes, Copy command is impressively faster. But the second step of parsing based on the substrings is where I’m concerned about.
I’ve no knowledge in python but will check out for any examples I could make use of.
Hi,

something like:

import psycopg2

# database connection parameters
db_host = 'your_db_host'
db_port = 'your_db_port'
db_name = 'your_db_name'
db_user = 'your_db_user'
db_pass = 'your_db_password'

# file to load
filename = 'your_file.txt'

# define the fixed-width fields
fields = [(0, 10), (10, 20), (20, 30)]

# create a database connection
try:
    conn = psycopg2.connect(host=db_host, port=db_port, dbname=db_name, user=db_user, password=db_pass)
except psycopg2.Error as e:
    print(f"Unable to connect to the database: {e}")
    exit()

# create a cursor
cur = conn.cursor()

# create a table for the data
try:
    cur.execute("CREATE TABLE IF NOT EXISTS my_table (col1 text, col2 text, col3 text)")
except psycopg2.Error as e:
    print(f"Unable to create table: {e}")
    exit()

# open the file and read the lines
with open(filename, 'r') as f:
    lines = f.readlines()

# create a string buffer for the COPY command
copy_buffer = []

# loop through the lines and create a string buffer for the COPY command
for line in lines:
    # parse the line into the fixed-width fields
    data = [line[start:end].strip() for start, end in fields]

    # create a tab-separated row and add it to the buffer
    copy_buffer.append('\t'.join(data))

# use the COPY command to load the data from the buffer
try:
    cur.copy_from('\n'.join(copy_buffer), 'my_table')
    conn.commit()
except psycopg2.Error as e:
    print(f"Unable to copy data: {e}")
    conn.rollback()

# close the cursor and database connection
cur.close()
conn.close()

Open in new window



Cheers
I predict individual insert statements 1 row at a time will NEVER perform well.

Never used this but it appears psycop2 was a similar copy command to that of psql:
https://www.psycopg.org/psycopg3/docs/basic/copy.html

Again, never used it so not sure how it performs compared to psql's copy but it has to be better than single row inserts.
Hi,

I thought about that too when I replied so here's one I updated using the psycopg2 copy command instead of the slow insert.  Thanks for mentioning that.

import psycopg2
import csv

# database connection parameters
db_host = 'your_db_host'
db_port = 'your_db_port'
db_name = 'your_db_name'
db_user = 'your_db_user'
db_pass = 'your_db_password'

# file to load
filename = 'your_file.txt'

# define the fixed-width fields
fields = [(0, 10), (10, 20), (20, 30)]

# create a database connection
try:
    conn = psycopg2.connect(host=db_host, port=db_port, dbname=db_name, user=db_user, password=db_pass)
except psycopg2.Error as e:
    print(f"Unable to connect to the database: {e}")
    exit()

# create a cursor
cur = conn.cursor()

# create a table for the data
try:
    cur.execute("CREATE TABLE IF NOT EXISTS my_table (col1 text, col2 text, col3 text)")
except psycopg2.Error as e:
    print(f"Unable to create table: {e}")
    exit()

# open the file and read the lines
with open(filename, 'r') as f:
    lines = f.readlines()

# create a CSV writer to write the parsed data to a temporary file
with open('temp_file.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    
    # loop through the lines and parse them into the fixed-width fields
    for line in lines:
        data = [line[start:end].strip() for start, end in fields]
        
        # write the parsed data to the temporary file
        writer.writerow(data)

# use the COPY command to load the data from the temporary file into the database table
with open('temp_file.csv', 'r') as csvfile:
    cur.copy_from(csvfile, 'my_table', sep=',')

# commit the changes and close the cursor and database connection
conn.commit()
cur.close()
conn.close()

Open in new window



Cheers
Still uses a temp file on disk.  That is sort of what I suggested as a possible work-around to use the psql copy command by making it a csv first.  Betting it isn't going to be any faster than parsing it with the SQL after loading it into a staging table but probably worth testing to confirm.

From the link I posted, I'm thinking you can read a line, parse/split it up and copy it in a single loop without generating a csv file first.  If that is possible, would be MUCH more efficient.
Hi,

yes I think you're right about that!  I'll get back a bit later with an updated version (yet again).

Cheers
ASKER CERTIFIED SOLUTION
Avatar of dfke
dfke

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
SOLUTION
Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Wow! That's great! Thank you both for your valuable inputs.
 The file comes in as a monthly file and has about close to 5 million records.
What about the number of columns and the record length in the file?
Any data type conversions like string dates to dates or to numbers, etc?

Doubt it will make much of a difference in overall performance but it would be nice to scale up a test case based on more realistic data.
it is a insurance data and this file has about 10 fields.
2 date fields, 5 string fields, 1 int field and 2 numeric
Record length in the flat file?  20 characters, 50, 500?

Can you provide an example row, character offsets and table definition?

Then we can set up an exact test case.

Granted I'll do what I did before and generate 5 million of the exact same row but that really shouldn't matter.

One more question:
Commits.  Should you load or reject the entire file? or can you commit parts of it?

If you look at dfke's last example, he commits every 1000 rows per his comment.  So, if one row in the middle fails to convert, you've committed the rows before it.
This is the first time we are going to get this file. I don't have this file handy, but will try to construct a sample data line tomorrow.
Yes, committing once every 1000 records is ideal.
Don't know if this will be efficient/quick but you could try using pandas read_fwf to get the data, sqlalchemy to connect to the databse the pandas to_sql to write it.
import pandas as pd
from sqlalchemy import create_engine

CONNSTRING = 'postgresql://user:password@localhost:5432/test'
  
db = create_engine(CONNSTRING)

conn = db.connect()

# cur = conn.cursor()

# cur.execute(
#     """
#     CREATE TABLE IF NOT EXISTS my_table (col1 text, col2 text, col3 text, col4 text, col5 text)

# """
# )
# cur.commit()

df = pd.read_fwf('data.txt', colspecs=[(0,5), (5,10), (10, 15), (15, 20), (20, 25)], names=['col1', 'col2', 'col3', 'col4', 'col5'])

print(df)

df.to_sql('my_table', con=conn,if_exists='append', index=False)

conn.close()

Open in new window

All, a great thanks to you all for your involvement and the passion in enlightening based on your analysis and the constant urge to improve on the solutions.
I'm really very thank you to all of you!

 
Can I ask why you accepted code that seems to break somewhere between 1 and 2 million rows when you will have over 5 million to process?
Hi,

My code could break because the COPY command is encountering an invalid byte sequence for encoding 'UTF8', which means that the data being read from the file contains characters that cannot be represented in UTF-8 encoding.

Updated to use chardet library to detect the used encoding. Also added the psycopg2.extras.DictCursor for the cursor to reduce memory usage. And now uses a context manager (with statement) for the database connection and cursor that should ensure that they are closed properly even in the event of an error. And yes, I'm aware that detecting the encoding might result in some overhead.


import psycopg2
import psycopg2.extras
import io
import chardet

# database connection parameters
db_host = 'your_db_host'
db_port = 'your_db_port'
db_name = 'your_db_name'
db_user = 'your_db_user'
db_pass = 'your_db_password'

# file to load
filename = 'your_file.txt'

# define the fixed-width fields
fields = [(0, 10), (10, 20), (20, 30)]

# create a context manager for the database connection and cursor
class Database:
    def __enter__(self):
        try:
            self.conn = psycopg2.connect(host=db_host, port=db_port, dbname=db_name, user=db_user, password=db_pass)
        except psycopg2.Error as e:
            print(f"Unable to connect to the database: {e}")
            raise
        self.cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        return self.cur
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.conn.commit()
        self.cur.close()
        self.conn.close()

# create a table for the data
with Database() as cur:
    try:
        cur.execute("CREATE TABLE IF NOT EXISTS my_table (col1 text, col2 text, col3 text)")
    except psycopg2.Error as e:
        print(f"Unable to create table: {e}")
        raise

# open the file and stream the lines
with open(filename, 'rb') as f:
    # detect the encoding of the file
    file_encoding = chardet.detect(f.read())['encoding']
    f.seek(0)

    # read the first line to skip header if needed
    next(f)
    # create a buffer to hold the data for COPY
    buffer = io.StringIO()
    for line in f:
        # decode the line using the detected encoding
        line = line.decode(file_encoding)
        # parse the line into the fixed-width fields
        data = [line[start:end].strip() for start, end in fields]
        # write the data to the buffer for COPY
        buffer.write('\t'.join(data) + '\n')
        # copy the data to the database in batches of 1000
        if buffer.tell() >= 1024 * 1024 * 10:
            buffer.seek(0)
            with Database() as cur:
                cur.copy_from(buffer, 'my_table', sep='\t')
            buffer.truncate(0)
    # copy any remaining data in the buffer to the database
    buffer.seek(0)
    with Database() as cur:
        cur.copy_from(buffer, 'my_table', sep='\t')

print("Data loaded successfully.")

Open in new window



Cheers
>> COPY command is encountering an invalid byte sequence for encoding 'UTF8', which means that the data being read from the file contains characters that cannot be represented in UTF-8 encoding

Can you try with the setup I posted and 2 million rows?

I even opened my 2 million row text file in notepad and made sure I saved as txt with ANSI encoding.  Code still errored.

>>And yes, I'm aware that detecting the encoding might result in some overhead.

Yes, there is and yes, it still errors in my test.

Here is your latest code against my 4 million row test, again the txt file is saved as ANSI.

after 39 seconds, far worse than anything else, still got a UTF8 error.

 Measure-Command { python .\db_test2.py }
Traceback (most recent call last):
  File "C:\scripts\db_test2.py", line 75, in <module>
    cur.copy_from(buffer, 'my_table', sep='\t')
psycopg2.errors.CharacterNotInRepertoire: invalid byte sequence for encoding "UTF8": 0x00
CONTEXT:  COPY my_table, line 1



Days              : 0
Hours             : 0
Minutes           : 0
Seconds           : 39
Milliseconds      : 198
Ticks             : 391987928
TotalDays         : 0.000453689731481481
TotalHours        : 0.0108885535555556
TotalMinutes      : 0.653313213333333
TotalSeconds      : 39.1987928
TotalMilliseconds : 39198.7928

Open in new window


Might I suggest you attempt to duplicate my test results?

Pretty interesting number there.....  I checked my_table after the error and it has 1048576 rows in it.

That number look familiar to you?

Betting the error comes from your buffer.tell() call.  Likely not reading a full line of text and grabbing some garbage causing the UTF8 error.