view NOTES @ 436:f16faa1fdc46 pg-transactions

InsertOrUpdateMetadata function + UpdateAndGetStatistics
author Alain Mazy <am@osimis.io>
date Fri, 15 Dec 2023 17:11:26 +0100
parents 326f8304daa1
children d979f25e60cf
line wrap: on
line source

Resources:
*********
- PG transaction modes explained: https://www.postgresql.org/files/developer/concurrency.pdf
- Isolation level explained (PG + MySQL): https://amirsoleimani.medium.com/understanding-database-isolation-level-via-examples-mysql-and-postgres-a86b5502d404
- Message queuing in PG: https://www.crunchydata.com/blog/message-queuing-using-native-postgresql


Create and delete instances Internals:
*************************************

isNewInstance = CreateInstance(...)

if (!isNewInstance && overwriteInstances)
  DeleteResource(instance)
       -> ClearDeletedFiles(manager);
            DELETE FROM DeletedFiles  ------> this is not a temporary table in postgres but it is supposed to be empty before and after each transaction -> it is empty when taking a snapshot of the DB in READ COMMITTED mode!!!
          ClearDeletedResources(manager);
            DELETE FROM DeletedResources  ------> this is not a temporary table in postgres but it is supposed to be empty before and after each transaction !!!

            DELETE FROM RemainingAncestor  ------> this is not a temporary table in postgres but it is supposed to be empty before and after each transaction !!!
            DELETE FROM Resources WHERE internalId=..
               -> cascades delete the MainDicomTags, the Metadata and the AttachedFiles
                  -> this triggers AttachedFileDeletedFunc
                         INSERT INTO DeletedFiles VALUES
                            (old.uuid, old.filetype, old.compressedSize,
                            old.uncompressedSize, old.compressionType,
                            old.uncompressedHash, old.compressedHash);
                        RETURN NULL;
               -> this triggers a SQL trigger: ResourceDeletedFunc
                        INSERT INTO DeletedResources VALUES (old.resourceType, old.publicId);
                        IF EXISTS (SELECT 1 FROM Resources WHERE parentId = old.parentId) THEN
                            -- Signal that the deleted resource has a remaining parent 
                            -- (a parent that must not be deleted but whose LastUpdate must be updated)
                            INSERT INTO RemainingAncestor
                            SELECT resourceType, publicId FROM Resources WHERE internalId = old.parentId;
                        ELSE
                            -- Delete a parent resource when its unique child is deleted 
                            DELETE FROM Resources WHERE internalId = old.parentId;
                        END IF;

            SELECT * FROM RemainingAncestor
              -> SignalRemainingAncestor()  // There is at most 1 remaining ancestor
                 -> ServerIndex::TransactionContext::SignalRemainingAncestor()
                    -> stores remainingType and remainingPublicId (this is used in StatelessDatabaseOperations::DeleteResource to build the Rest Response of /delete 
                                                                   and to update the LastUpdate of all parent (only when deleted from /delete))

          SignalDeletedFiles(output, manager);
            SELECT * FROM DeletedFiles
              -> SignalDeletedAttachment()
                 -> ServerIndex::TransactionContext::SignalAttachmentDeleted()
                    -> pendingFilesToRemove_.push_back(FileToRemove(info))  (files are deleted in CommitFilesToRemove in the ServerIndex::TransactionContext::Commit)

          SignalDeletedResources(output, manager);
            SELECT resourceType, publicId FROM DeletedResources
              -> SignalDeletedResource()
                 -> Emit DeletedResource event (lua)


  if (!CreateInstance(...))
    Error: "No new instance while overwriting; this should not happen"

if isNewInstance -> LogChange
if isNewSeries -> LogChange
....

Sample SQL code that you can execute in DBeaver to test new functions/procedures:

CreateInstance
************************************************************************

CREATE OR replace FUNCTION CreateInstance(
  IN patient TEXT,
  IN study TEXT,
  IN series TEXT,
  IN instance TEXT,
  OUT isNewPatient BIGINT,
  OUT isNewStudy BIGINT,
  OUT isNewSeries BIGINT,
  OUT isNewInstance BIGINT,
  OUT patientKey BIGINT,
  OUT studyKey BIGINT,
  OUT seriesKey BIGINT,
  OUT instanceKey BIGINT)
AS $$ 
begin
	isNewPatient := 1;
	isNewStudy := 1;
	isNewSeries := 1;
	isNewInstance := 1;

	BEGIN
        INSERT INTO "resources" VALUES (DEFAULT, 0, patient, NULL);
    exception
        when unique_violation then
            isNewPatient := 0;
    end;
    select internalid into patientKey from "resources" where publicId=patient and resourcetype = 0;

	BEGIN
        INSERT INTO "resources" VALUES (DEFAULT, 1, study, patientKey);
    exception
        when unique_violation then
            isNewStudy := 0;
    end;
    select internalid into studyKey from "resources" where publicId=study and resourcetype = 1;

	BEGIN
	    INSERT INTO "resources" VALUES (DEFAULT, 2, series, studyKey);
    exception
        when unique_violation then
            isNewSeries := 0;
    end;
	select internalid into seriesKey from "resources" where publicId=series and resourcetype = 2;

  	BEGIN
		INSERT INTO "resources" VALUES (DEFAULT, 3, instance, seriesKey);
    exception
        when unique_violation then
            isNewInstance := 0;
    end;
    select internalid into instanceKey from "resources" where publicId=instance and resourcetype = 3;
   
END;
$$ LANGUAGE plpgsql;

DO $$ 
DECLARE 
    result record;
begin
	delete from "resources";

    SELECT * INTO result from CreateInstance('patient1', 'study1', 'series1', 'instance1');

    RAISE NOTICE 'Value patientInternalId: %, is_new: %', result.patientKey, result.isNewPatient;
    RAISE NOTICE 'Value studyInternalId: %, is_new: %', result.studyKey, result.isNewStudy;
    RAISE NOTICE 'Value seriesInternalId: %, is_new: %', result.seriesKey, result.isNewSeries;
    RAISE NOTICE 'Value instanceInternalId: %, is_new: %', result.instanceKey, result.isNewInstance;
    RAISE NOTICE '--------------';

    SELECT * INTO result from CreateInstance('patient1', 'study1', 'series1', 'instance2');

    RAISE NOTICE 'Value patientInternalId: %, is_new: %', result.patientKey, result.isNewPatient;
    RAISE NOTICE 'Value studyInternalId: %, is_new: %', result.studyKey, result.isNewStudy;
    RAISE NOTICE 'Value seriesInternalId: %, is_new: %', result.seriesKey, result.isNewSeries;
    RAISE NOTICE 'Value instanceInternalId: %, is_new: %', result.instanceKey, result.isNewInstance;
    RAISE NOTICE '--------------';

    SELECT * INTO result from CreateInstance('patient1', 'study1', 'series1', 'instance2');

    RAISE NOTICE 'Value patientInternalId: %, is_new: %', result.patientKey, result.isNewPatient;
    RAISE NOTICE 'Value studyInternalId: %, is_new: %', result.studyKey, result.isNewStudy;
    RAISE NOTICE 'Value seriesInternalId: %, is_new: %', result.seriesKey, result.isNewSeries;
    RAISE NOTICE 'Value instanceInternalId: %, is_new: %', result.instanceKey, result.isNewInstance;
    RAISE NOTICE '--------------';

END $$;

-- \set patient_key 'patient_key'
-- SELECT CreateInstance('patient', 'study', 'series', 'instance', patient_key) ;

-- drop function CreateInstance
-- select * from "resources";
-- delete from "resources";
-- INSERT INTO "resources" VALUES (DEFAULT, 0, 'patient', NULL)



************************************************************************

In debug, no verbose logs
Orthanc 1.12.1 + PG 5.1 (serializable mode)          : test_concurrent_anonymize_same_study with 4 workers and 10x repeat: 43.957 s
Orthanc mainline + PG mainline (read-committed mode) : test_concurrent_anonymize_same_study with 4 workers and 10x repeat: 15.744 s
                                                       test_concurrent_anonymize_same_study deletion took: 18.8 s

Orthanc 1.12.1 + PG 5.1 (serializable mode)          : test_concurrent_uploads_same_study with 20 workers and 1x repeat: 21.214 s
Orthanc mainline + PG mainline (read-committed mode) : test_concurrent_uploads_same_study with 20 workers and 1x repeat: 6.57 s (with temporary tables)

Orthanc 1.12.1 + PG 5.1 (serializable mode)          : test_upload_delete_same_study_from_multiple_threads with 5 workers and 30x repeat: 23.016 s
Orthanc mainline + PG mainline (read-committed mode) : test_upload_delete_same_study_from_multiple_threads with 5 workers and 30x repeat: 7.129 s 

With Docker with 10 connections SQL:
osimis/orthanc:23.11.0: TIMING test_concurrent_anonymize_same_study with 4 workers and 10x repeat: 25.047 s
osimis/orthanc:current: TIMING test_concurrent_anonymize_same_study with 4 workers and 10x repeat: 8.649 s

osimis/orthanc:23.11.0: TIMING test_concurrent_anonymize_same_study deletion took: 11.807 s
osimis/orthanc:current: TIMING test_concurrent_anonymize_same_study deletion took: 10.620 s

osimis/orthanc:23.11.0: TIMING test_concurrent_uploads_same_study with 20 workers and 1x repeat: 10.736 s
osimis/orthanc:current: TIMING test_concurrent_uploads_same_study with 20 workers and 1x repeat: 3.465 s

osimis/orthanc:23.11.0: TIMING test_upload_delete_same_study_from_multiple_threads with 5 workers and 30x repeat: 11.966 s
osimis/orthanc:current: TIMING test_upload_delete_same_study_from_multiple_threads with 5 workers and 30x repeat: 5.092 s


TODO:
- check RETURNS SET OF !
  CREATE FUNCTION getfoo(int) RETURNS SETOF foo AS $$
      SELECT * FROM foo WHERE fooid = $1;
  $$ LANGUAGE SQL;
- have a separate "thread" to increment/decrement statistics because everybody is fighting to modify the GlobalIntegers rows
    ExecuteSetResourcesContentTags(manager, "DicomIdentifiers", "i",      -> make unique + handle on conflicts
                                   countIdentifierTags, identifierTags);

    ExecuteSetResourcesContentTags(manager, "MainDicomTags", "t",         -> make unique + handle on conflicts
                                   countMainDicomTags, mainDicomTags);
    
    ExecuteSetResourcesContentMetadata(manager, HasRevisionsSupport(), countMetadata, metadata);  handle on conflicts

CREATE OR REPLACE FUNCTION InsertOrReplaceMetadata(VARIADIC params SETOF metadata)
RETURNS VOID AS $$
BEGIN

  FOR i IN 1..array_length(my_params, 1) LOOP
    my_sum := my_sum + my_params[i].my_column;
  END LOOP;
  RETURN my_sum;
END;
$$ LANGUAGE plpgsql;

#include <postgresql/libpq-fe.h>

int main() {
  PGconn *conn = PQconnectdb("dbname=mydb user=myuser password=mypassword");
  if (PQstatus(conn) != CONNECTION_OK) {
    std::cerr << "Connection to database failed: " << PQerrorMessage(conn) << std::endl;
    PQfinish(conn);
    return 1;
  }

  const char *function_name = "my_function";
  int num_params = 3;
  int params[] = {1, 2, 3};

  char *query = PQescapeLiteral(conn, function_name, strlen(function_name));
  for (int i = 0; i < num_params; i++) {
    char *param = PQescapeLiteral(conn, (const char *)&params[i], sizeof(int));
    query = (char *)realloc(query, strlen(query) + strlen(param) + 1);
    strcat(query, ",");
    strcat(query, param);
    PQfreemem(param);
  }
  strcat(query, ")");

  PGresult *res = PQexec(conn, query);
  if (PQresultStatus(res) != PGRES_TUPLES_OK) {
    std::cerr << "Function call failed: " << PQerrorMessage(conn) << std::endl;
    PQfreemem(query);
    PQclear(res);
    PQfinish(conn);
    return 1;
  }

  int result = atoi(PQgetvalue(res, 0, 0));
  std::cout << "Result: " << result << std::endl;

  PQfreemem(query);
  PQclear(res);
  PQfinish(conn);


- test events generation StableSeries ....
- run tests with docker localy + in CI
- check https://discourse.orthanc-server.org/t/image-insert-are-too-slow-databse-performance-too-poor-when-using-mysql-mariadb/3820
- PatientAddedFunc contains an IF
- validate upgrade DB from previous Orthanc and from scratch
- test with older version of PG
- In Docker images, re-enable MySQL & ODBC plugins + tests

DONE:
- force the create/update DB transaction to be serializable (not needed: this is handled by POSTGRESQL_LOCK_DATABASE_SETUP)