Multithreaded file upload synchronization

admin

Administrator
Staff member
Currently I am working on a Delphi XE3 client/server application to transfer files (with the Indy FTP components). The client part monitors a folder, gets a list of the files inside, uploads them to the server and deletes the originals. The uploading is done by a separate thread, which processes files one by one. The files can range from 0 to a few thousand and their sizes also vary a lot.

It is a Firemonkey app compiled for both OSX and Windows, so I had to use TThread instead of OmniThreadLibrary, which I preferred. My customer reports that the application randomly freezes. I could not duplicate it, but since I don't have so much experience with TThread, I might have put deadlock condition somewhere. I read quite a lot of examples, but I'm still not sure about some of the multithread specifics.

The app structure is simple:<br>
A timer in the main thread checks the folder and gets information about each file into a record, which goes into a generic TList. This list keeps information about the names of the files, size, the progress, whether the file is completely uploaded or has to be retried. All that is displayed in a grid with progress bars, etc. This list is accessed only by the main thread.
After that the items from the list are sent to the thread by calling the AddFile method (code below). The thread stores all files in a thread-safe queue like this one <a href="http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/">http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/</a><br>
When the file is uploaded the uploader thread notifies the main thread with a call to Synchronize.<br>
The main thread periodically calls the Uploader.GetProgress method to check the current file progress and display it. This function is not actually thread-safe, but could it cause a deadlock, or only wrong data returned?

What would be a safe and efficient way to do the progress check?

So is this approach OK or I have missed something? How would you do this?<br>
For example I though of making a new thread just to read the folder contents. This means that the TList I use has to be made thread-safe, but it has to be accessed all the time to refresh the displayed info in the GUI grid. Wouldn't all the synchronization just slow down the GUI?

I have posted the simplified code below in case someone wants to look at it. If not, I would be happy to hear some opinions on what I should use in general. The main goals are to work on both OSX and Windows; to be able to display information about all the files and the progress of the current one; and to be responsive regardless of the number and size of the files.

That's the code of the uploader thread. I have removed some of it for easier reading:

Code:
type
  TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
  TFileInfo = record
    ID: Integer;
    Path: String;
    Size: Int64;
    UploadedSize: Int64;
    Status: TFileStatus;
  end;

  TUploader = class(TThread)
  private
    FTP: TIdFTP;
    fQueue: TThreadedQueue&lt;TFileInfo&gt;;
    fCurrentFile: TFileInfo;
    FUploading: Boolean;
    procedure ConnectFTP;
    function UploadFile(aFileInfo: TFileInfo): String;
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    procedure SignalComplete;
    procedure SignalError(aError: String);
  protected
    procedure Execute; override;
  public
    property Uploading: Boolean read FUploading;
    constructor Create;
    destructor Destroy; override;
    procedure Terminate;
    procedure AddFile(const aFileInfo: TFileInfo);
    function GetProgress: TFileInfo;
  end;

procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
  fQueue.Enqueue(aFileInfo);
end;

procedure TUploader.ConnectFTP;
begin
  ...
    FTP.Connect;
end;

constructor TUploader.Create;
begin
  inherited Create(false);
  FreeOnTerminate := false;
  fQueue := TThreadedQueue&lt;TFileInfo&gt;.Create;
  // Create the TIdFTP and set ports and other params
  ...
end;

destructor TUploader.Destroy;
begin
  fQueue.Close;
  fQueue.Free;
  FTP.Free;
  inherited;
end;

// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
  Temp: TFileInfo;
begin
  try
    ConnectFTP;
  except
    on E: Exception do
      SignalError(E.Message);
  end;

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while fQueue.Peek(fCurrentFile) = wrSignaled do
    try
      if UploadFile(fCurrentFile) = '' then
      begin
        fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
        SignalComplete;
      end;
    except
      on E: Exception do
        SignalError(E.Message);
    end;
end;

// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
  Result := fCurrentFile;
end;

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
  fCurrentFile.UploadedSize := AWorkCount;
end;

procedure TUploader.SignalComplete;
begin
  Synchronize(
    procedure
    begin
      frmClientMain.OnCompleteFile(fCurrentFile);
    end);
end;

procedure TUploader.SignalError(aError: String);
begin
  try
    FTP.Disconnect;
  except
  end;
  if fQueue.Closed then
    Exit;

  Synchronize(
    procedure
    begin
      frmClientMain.OnUploadError(aError);
    end);
end;

// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
  fQueue.Close;
  inherited;
end;

function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
  Result := 'Error';
  try
    if not FTP.Connected then
      ConnectFTP;
    FUploading := true;
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));     
    Result := '';
  finally
    FUploading := false;
  end;
end;

And parts of the main thread that interact with the uploader:

Code:
......
// Main form
    fUniqueID: Integer;  // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
    fUploader: TUploader;         // The uploader thread
    fFiles: TList&lt;TFileInfo&gt;;
    fCurrentFileName: String;     // Used to display the progress
    function IndexOfFile(aID: Integer): Integer;    //Return the index of the record inside the fFiles given the file ID
  public
    procedure OnCompleteFile(aFileInfo: TFileInfo);
    procedure OnUploadError(aError: String);
  end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
  // show and log the error
end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
  I: Integer;
begin
  I := IndexOfFile(aFileInfo.ID);
  if (I &gt;= 0) and (I &lt; fFiles.Count) then
  begin
    aFileInfo.Status := fsUploaded;
    aFileInfo.UploadedSize := aFileInfo.Size;
    FFiles.Items[I] := aFileInfo;
    Inc(FFilesUploaded);
    TFile.Delete(aFileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

procedure TfrmClientMain.ProcessFolder;
var
  NewFiles: TStringDynArray;
  I, J: Integer;
  FileInfo: TFileInfo;
begin
    // Remove completed files from the list if it contains more than XX files
    while FFiles.Count &gt; 1000 do
      if FFiles[0].Status = fsUploaded then
      begin
        Dec(FFilesUploaded);
        FFiles.Delete(0);
      end else
        Break;

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
    for I := 0 to Length(NewFiles) - 1 do
    begin
          FileInfo.ID := FUniqueID;
          Inc(FUniqueID);
          FileInfo.Path := NewFiles[I];
          FileInfo.Size := GetFileSizeByName(NewFiles[I]);
          FileInfo.UploadedSize := 0;
          FileInfo.Status := fsToBeQueued;
          FFiles.Add(FileInfo);

      if (I mod 100) = 0 then
      begin
        UpdateStatusLabel;
        grFiles.RowCount := FFiles.Count;
        Application.ProcessMessages;
        if fUploader = nil then
          break;
      end;
    end;

    // Send the new files and resend failed to the uploader thread
    for I := 0 to FFiles.Count - 1 do
      if (FFiles[I].Status = fsToBeQueued) then
      begin
        if fUploader = nil then
          Break;
        FileInfo := FFiles[I];
        FileInfo.Status := fsQueued;
        FFiles[I] := FileInfo;
        SaveDebug(1, 'Add:    ' + ExtractFileName(FFiles[I].Path));
        FUploader.AddFile(FFiles[I]);
      end;
end;

procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
  FileInfo: TFileInfo;
  I: Integer;
begin
  if (fUploader = nil) or not fUploader.Uploading then
    Exit;
  FileInfo := fUploader.GetProgress;
  I := IndexOfFile(FileInfo.ID);
  if (I &gt;= 0) and (I &lt; fFiles.Count) then
  begin
    fFiles.Items[I] := FileInfo;
    fCurrentFileName := ExtractFileName(FileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
  I: Integer;
begin
  Result := -1;
  for I := 0 to FFiles.Count - 1 do
    if FFiles[I].ID = aID then
      Exit(I);
end;