Mercurial > public > finance-parser
changeset 3:2e5f3664f3e4
documents analyzer almost finished
author | Dennis C. M. <dennis@denniscm.com> |
---|---|
date | Fri, 02 Jun 2023 20:12:29 +0100 |
parents | ef8a4d95755a |
children | 9005b7590008 |
files | .idea/finance-parser.iml analyze_document/__init__.py analyze_document/app.py analyze_document/requirements.txt events/analyze_document_event.json events/process_document_event.json events/upload_document_event.json process_document/app.py statemachine/statemachine.asl.json template.yaml upload_document/__init__.py upload_document/app.py upload_document/requirements.txt |
diffstat | 11 files changed, 357 insertions(+), 102 deletions(-) [+] |
line wrap: on
line diff
--- a/.idea/finance-parser.iml Thu Jun 01 18:51:18 2023 +0100 +++ b/.idea/finance-parser.iml Fri Jun 02 20:12:29 2023 +0100 @@ -21,7 +21,7 @@ <option value="AUTO_EXPAND" /> </list> </option> - <option name="stackName" value="FinanceParser" /> + <option name="stackName" value="Denniscm-FinanceParser" /> <option name="tags"> <map /> </option>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/analyze_document/app.py Fri Jun 02 20:12:29 2023 +0100 @@ -0,0 +1,44 @@ +import json +import boto3 +import uuid +import re + + +textract_client = boto3.client('textract') +s3_client = boto3.client('s3') + + +def lambda_handler(event, context): + event_detail = event['detail'] + bucket_name = event_detail['bucket']['name'] + object_key = event_detail['object']['key'] + company_ticker = re.search('unprocessed/(.*).pdf', object_key).group(1) + + data_dict = textract_client.analyze_document( + Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}, + FeatureTypes=['TABLES'] + ) + + data_string = json.dumps(data_dict, indent=2, default=str) + filename = f'{company_ticker}_{uuid.uuid4()}.json' + + s3_client.put_object( + Bucket=bucket_name, + Key=f'analyzed/{filename}', + Body=data_string + ) + + s3_client.delete_object( + Bucket=bucket_name, + Key=object_key + ) + + return { + "statusCode": 200, + "body": { + "message": { + "objectKey": f'analyzed/{filename}', + "bucketName": bucket_name + } + }, + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/analyze_document/requirements.txt Fri Jun 02 20:12:29 2023 +0100 @@ -0,0 +1,1 @@ +boto3 \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/events/analyze_document_event.json Fri Jun 02 20:12:29 2023 +0100 @@ -0,0 +1,28 @@ +{ + "version":"0", + "id":"434ad981-1cce-8857-e57c-10039ddf700a", + "detail-type":"Object Created", + "source":"aws.s3", + "account":"587328694482", + "time":"2023-06-02T16:15:26Z", + "region":"eu-central-1", + "resources":[ + "arn:aws:s3:::sandbox-finance-parser-data" + ], + "detail":{ + "version":"0", + "bucket":{ + "name":"sandbox-finance-parser-data" + }, + "object":{ + "key":"unprocessed/san.pdf", + "size":49856, + "etag":"0adc595c8f2dbfabb5c4095f1f91b458", + "sequencer":"00647A159E6438B1A6" + }, + "request-id":"47GMMWBYT14BGH15", + "requester":"587328694482", + "source-ip-address":"88.25.226.176", + "reason":"PutObject" + } +} \ No newline at end of file
--- a/events/process_document_event.json Thu Jun 01 18:51:18 2023 +0100 +++ b/events/process_document_event.json Fri Jun 02 20:12:29 2023 +0100 @@ -1,38 +1,9 @@ { - "Records":[ - { - "eventVersion":"2.1", - "eventSource":"aws:s3", - "awsRegion":"eu-central-1", - "eventTime":"2023-06-01T16:53:50.860Z", - "eventName":"ObjectCreated:Put", - "userIdentity":{ - "principalId":"AWS:AROAYRP4EVDJOMTEWOIXJ:dennis" - }, - "requestParameters":{ - "sourceIPAddress":"88.25.226.176" - }, - "responseElements":{ - "x-amz-request-id":"X1HS8KY4ZX3GBSCD", - "x-amz-id-2":"AE2BzGU/+Dk0x2lsYhw6b8h2Ha67cxSK/hsI0NIRnjP9/UePvBfYS4GabPgzpdd6JSM6LYSLJvjDDFeOfES5ip1dtfsPSw5G" - }, - "s3":{ - "s3SchemaVersion":"1.0", - "configurationId":"f0a23387-f41d-4d90-b2cb-f16f2c61c5ab", - "bucket":{ - "name":"sandbox-finance-parser-data", - "ownerIdentity":{ - "principalId":"A2WI146QA2L7B1" - }, - "arn":"arn:aws:s3:::sandbox-finance-parser-data" - }, - "object":{ - "key":"balance_sheet.pdf", - "size":49856, - "eTag":"0adc595c8f2dbfabb5c4095f1f91b458", - "sequencer":"006478CD1EC996352F" - } - } + "statusCode": 200, + "body": { + "message": { + "objectKey": "analyzed/san_f0799678-a362-4b7f-9fff-c26b0bbf2b15.json", + "bucketName": "sandbox-finance-parser-data" } - ] + } } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/events/upload_document_event.json Fri Jun 02 20:12:29 2023 +0100 @@ -0,0 +1,9 @@ +{ + "statusCode": 200, + "body": { + "message": { + "objectKey": "processed/san_d7312109-9099-4dd2-a984-55768641b25e.json", + "bucketName": "sandbox-finance-parser-data" + } + } +} \ No newline at end of file
--- a/process_document/app.py Thu Jun 01 18:51:18 2023 +0100 +++ b/process_document/app.py Fri Jun 02 20:12:29 2023 +0100 @@ -3,80 +3,95 @@ from datetime import datetime from collections import defaultdict + s3_client = boto3.client('s3') -textract_client = boto3.client('textract') def lambda_handler(event, context): - for record in event['Records']: - metadata = record['s3'] - bucket_name = metadata['bucket']['name'] - object_key = metadata['object']['key'] + event_message = event['body']['message'] + object_key = event_message['objectKey'] + bucket_name = event_message['bucketName'] + + # Download file from s3 + s3_client.download_file(bucket_name, object_key, '/tmp/document.json') - doc = textract_client.analyze_document( - Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}, - FeatureTypes=['TABLES'] - ) + with open('/tmp/document.json') as f: + doc = json.load(f) - # Analyze document - result = defaultdict(dict) - blocks = doc['Blocks'] + # Analyze document + result = defaultdict(dict) + blocks = doc['Blocks'] - # Get format - lines = filter_blocks(blocks, 'BlockType', 'LINE') - for line in lines: - amount_format = get_format(line['Text']) - result['format'] = amount_format - if amount_format: - break + # Get format + lines = filter_blocks(blocks, 'BlockType', 'LINE') + for line in lines: + amount_format = get_format(line['Text']) + result['format'] = amount_format + if amount_format: + break - # Find dates value and position - data = defaultdict(dict) - cells = filter_blocks(blocks, 'BlockType', 'CELL') - for cell in cells: - if not 'Relationships' in cell: - continue + # Find dates value and position + data = defaultdict(dict) + cells = filter_blocks(blocks, 'BlockType', 'CELL') + for cell in cells: + if not 'Relationships' in cell: + continue - child_ids = [r['Ids'] for r in cell['Relationships'] if r['Type'] == 'CHILD'][0] + child_ids = [r['Ids'] for r in cell['Relationships'] if r['Type'] == 'CHILD'][0] + + # Get `Text` from `CELL` block + cell_text = '' + for index, child_id in enumerate(child_ids): + word_block = filter_blocks(blocks, 'Id', child_id)[0] + cell_text += word_block['Text'] - # Get `Text` from `CELL` block - cell_text = '' - for index, child_id in enumerate(child_ids): - word_block = filter_blocks(blocks, 'Id', child_id)[0] - cell_text += word_block['Text'] + if index < len(child_ids) - 1: + cell_text += '_' - if index < len(child_ids) - 1: - cell_text += '_' + # Verify if `Text` could be a valid date + date_string = is_date(cell_text) + if date_string: + cell_text = date_string + result['dateRow'] = cell['RowIndex'] + result['dateColumns'][cell['ColumnIndex']] = date_string - # Verify if `Text` could be a valid date - date_string = is_date(cell_text) - if date_string: - cell_text = date_string - result['dateRow'] = cell['RowIndex'] - result['dateColumns'][cell['ColumnIndex']] = date_string + cell_row_index = cell['RowIndex'] + cell_column_index = cell['ColumnIndex'] + data[cell_row_index][cell_column_index] = clean(cell_text) + + try: + data[cell_row_index]['type'] = cell['EntityTypes'] + except KeyError: + pass - cell_row_index = cell['RowIndex'] - cell_column_index = cell['ColumnIndex'] - data[cell_row_index][cell_column_index] = clean(cell_text) + # Delete unused row and columns + for row_index in list(data.keys()): + row = data[row_index] + for column_index in list(row.keys()): + if column_index not in result['dateColumns'] \ + and column_index != 1 and column_index != 'type': + del row[column_index] - # Delete unused row and columns - for row_index in list(data.keys()): - if row_index > result['dateRow']: - row = data[row_index] - for column_index in list(row.keys()): - if column_index not in result['dateColumns'] and column_index != 1: - del row[column_index] + if len(row) > 1: + result['data'][row_index] = row + + filename = object_key.replace('analyzed/', 'processed/') + data_string = json.dumps(result, indent=2, default=str) - if len(row) > 1: - result['data'][row_index] = row - - print(f'RESULT: {result}') + s3_client.put_object( + Bucket=bucket_name, + Key=filename, + Body=data_string + ) return { "statusCode": 200, - "body": json.dumps({ - "message": "ok" - }), + "body": { + "message": { + "objectKey": filename, + "bucketName": bucket_name + } + }, }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/statemachine/statemachine.asl.json Fri Jun 02 20:12:29 2023 +0100 @@ -0,0 +1,20 @@ +{ + "StartAt": "AnalyzeDocumentWithTextract", + "States": { + "AnalyzeDocumentWithTextract": { + "Type": "Task", + "Resource": "${AnalyzeDocumentFunctionArn}", + "Next": "ProcessDocument" + }, + "ProcessDocument": { + "Type": "Task", + "Resource": "${ProcessDocumentFunctionArn}", + "Next": "UploadDocument" + }, + "UploadDocument": { + "Type": "Task", + "Resource": "${UploadDocumentFunctionArn}", + "End": true + } + } +} \ No newline at end of file
--- a/template.yaml Thu Jun 01 18:51:18 2023 +0100 +++ b/template.yaml Fri Jun 02 20:12:29 2023 +0100 @@ -14,7 +14,7 @@ Architectures: - x86_64 Timeout: 20 - MemorySize: 256 + MemorySize: 128 Tracing: Active Resources: @@ -25,11 +25,50 @@ - CreateProdResources - finance-parser-data - sandbox-finance-parser-data + NotificationConfiguration: + EventBridgeConfiguration: + EventBridgeEnabled: true - ProcessDocumentFunction: + StateMachine: + Type: AWS::Serverless::StateMachine + Properties: + Tracing: + Enabled: true + DefinitionUri: statemachine/statemachine.asl.json + DefinitionSubstitutions: + AnalyzeDocumentFunctionArn: !GetAtt AnalyzeDocumentFunction.Arn + ProcessDocumentFunctionArn: !GetAtt ProcessDocumentFunction.Arn + UploadDocumentFunctionArn: !GetAtt UploadDocumentFunction.Arn + Events: + StateChange: + Type: EventBridgeRule + Properties: + Pattern: + source: + - aws.s3 + detail-type: + - Object Created + detail: + bucket: + name: + - !Ref S3Bucket + object: + key: + - "prefix": "unprocessed/" + Connectors: + StateMachineConnector: + Properties: + Destination: + - Id: AnalyzeDocumentFunction + - Id: ProcessDocumentFunction + - Id: UploadDocumentFunction + Permissions: + - Write + + AnalyzeDocumentFunction: Type: AWS::Serverless::Function Properties: - CodeUri: process_document/ + CodeUri: analyze_document/ Handler: app.lambda_handler Runtime: python3.7 Policies: @@ -39,12 +78,21 @@ Action: - textract:AnalyzeDocument Resource: "*" - Events: - NewBalanceSheetEvent: - Type: S3 - Properties: - Bucket: !Ref S3Bucket - Events: s3:ObjectCreated:* + Connectors: + S3Connector: + Properties: + Destination: + Id: S3Bucket + Permissions: + - Read + - Write + + ProcessDocumentFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: process_document/ + Handler: app.lambda_handler + Runtime: python3.7 Connectors: S3Connector: Properties: @@ -52,4 +100,44 @@ Id: S3Bucket Permissions: - Read - - Write \ No newline at end of file + - Write + + UploadDocumentFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: upload_document/ + Handler: app.lambda_handler + Runtime: python3.7 + Connectors: + DynamoConnector: + Properties: + Destination: + Id: DynamoTable + Permissions: + - Write + S3Connector: + Properties: + Destination: + Id: S3Bucket + Permissions: + - Read + + DynamoTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: FinanceParser + BillingMode: PAY_PER_REQUEST + DeletionProtectionEnabled: !If + - CreateProdResources + - True + - False + KeySchema: + - AttributeName: pk + KeyType: HASH + - AttributeName: sk + KeyType: RANGE + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + - AttributeName: sk + AttributeType: S \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/upload_document/app.py Fri Jun 02 20:12:29 2023 +0100 @@ -0,0 +1,78 @@ +import json +import boto3 +import re + +s3_client = boto3.client('s3') +dynamodb = boto3.resource('dynamodb') +table = dynamodb.Table('FinanceParser') + + +def lambda_handler(event, context): + event_message = event['body']['message'] + object_key = event_message['objectKey'] + bucket_name = event_message['bucketName'] + company_ticker = re.search('processed/(.*)_', object_key).group(1) + + # Download file from s3 + s3_client.download_file(bucket_name, object_key, '/tmp/document.json') + + with open('/tmp/document.json') as f: + doc = json.load(f) + + for dateColumn, date in doc['dateColumns'].items(): + for row_index, account in doc['data'].items(): + + try: + column_types = account['type'] + except KeyError: + column_types = [] + + """ + The following statement avoids getting a `2020` as the value + of `ASSETS`. + + +------------------+------+------+ + | ASSETS | 2020 | 2019 | + +------------------+------+------+ + | ASSETS_ACCOUNT_1 | | | + +------------------+------+------+ + | ASSETS_ACCOUNT_2 | | | + +------------------+------+------+ + """ + + account_value = account[dateColumn] + if 'COLUMN_HEADER' in column_types and date == account_value: + account_value = '' + + with table.batch_writer() as batch: + + # pk -> item_type#company_ticker + # sk -> date#row_index + + batch.put_item( + Item={ + 'pk': f'balance#{company_ticker}', + 'sk': f'{date}#{row_index}', + 'account_name': account['1'], + 'account_value': account_value, + 'column_types': column_types + } + ) + + # pk -> item_type#company_ticker + # sk -> date + + table.put_item( + Item={ + 'pk': f'file#{company_ticker}', + 'sk': f"{date}", + 'filename': object_key.replace('processed/', '') + } + ) + + return { + "statusCode": 200, + "body": json.dumps({ + "message": "ok" + }), + }