How to run a script command in parallel (multi-threading)

@Leo gave me a tip here while I was asking in passing by how I could run a script command once for each selected file individually instead of once for all files. This is no problem if you execute an external command and use @async, which doesn't work for commands or inline scripts, incl. "rename * to * @script..." trick. And you cannot limit the number of parallel threads with external progs anyway.

And I was looking at Leo's SHA script today. Apart from the 512MB bug they mention, I still liked the idea of having some checksum columns, but became quickly sober again after seeing how it runs in "single thread", 1 file at a time.

Well, we have so many CPU cores and hyperthreading now, so why not use 100% of CPU ? So put the 2 above together, I quickly concocted a multi-threaded command wrapper, with MAXCOUNT/N,MAXWAIT/N,COMMAND/K. Now you can select a bunch of files, set the MAXCOUNT to X, start hashing in parallel. It would easily work for some other user commands as well. It's an ugly hack but it works, in fact I'm planning to use this for quite a lot of ideas, like putting hash values in my recent favorite ADS.

There are 2 caveats though:

  • The called command must receive a "ResVar" parameter and set it back in Script.Vars, so that the thread manager can receive it.
  • If a thread still is running after a timeout is reached, it cannot be killed or received values from; it will continue running in background. So the command you call must make sure it's not running forever.

This is just a proof of concept but still a hack. I bet you'd have some great ideas for improvements.

// MultiThread Test
// (c) 2021 cu
var util = {};
util.cmdGlobal	= DOpus.Create.Command;
util.sv			= Script.vars;
util.dopusrt	= 'dopusrt /acmd ';
/*
	Proof of Concept - Multi-Threaded Commands

	To test this, create a new button as such:

	@nodeselect
	MultiThreadManagerStart MAXCOUNT=8 MAXWAIT=5000 COMMAND "CalcSHA256"

	Basically the command which needs to be run in parallel, "CalcSHA256" in this case,
	must at least have a parameter called
		RESVAR (e.g. cmd.template='RESVAR/K, ...')
	and must set it before returning, e.g.
		Script.vars.Set(resvar) = calculated_hash;
	and the Thread Manager and Thread Workers will take care of the rest.

	The reason why RESVAR is necessary is that there is no possibility for Script Commands
	to directly return a value with standard JS, i.e. 'return myval;' ...that doesn't work.

	And since we run basically everything via 'dopusrt /acmd' anyway,
	the target command "CalcSHA256" is run completely asynchronously in a thread
	from which we would have no possibility to receive the return value.
	Now you know.
*/

// Called by Directory Opus to initialize the script
function OnInit(initData)
{
	initData.name = "MultiThread Test";
	initData.version = "1.0";
	initData.copyright = "(c) 2021 cuneytyilmaz.com";
	initData.desc = "";
	initData.default_enable = true;
	initData.min_version = "12.0";

	var cmd         = initData.AddCommand();
	cmd.name        = 'MultiThreadManagerStart';
	cmd.method      = 'OnMultiThreadManagerStart';
	cmd.template    = 'MAXCOUNT/N,MAXWAIT/N,COMMAND/K';
	cmd.label		= 'Start Multi Threaded Command';
	cmd.desc        = 'not yet';

	var cmd         = initData.AddCommand();
	cmd.name        = 'MultiThreadWorker';
	cmd.method      = 'OnMultiThreadWorker';
	cmd.template    = 'THREADID/K,MAXWAIT/N,CMD/K,FILE/K';
	cmd.label		= 'Start Multi Threaded Command';
	cmd.desc        = 'not yet';

	var cmd         = initData.AddCommand();
	cmd.name        = 'CalcSHA256';
	cmd.method      = 'OnCalcSHA256';
	cmd.template    = 'RESVAR/K,FILE/K';
	cmd.label		= 'Calc SHA-256';
	cmd.desc        = 'not yet';
}


function getTS() {
	return new Date().getTime();
}
function getThreadID(ts) {
	return 't_' + ts + '_' + Math.floor(100 + Math.random() * 899);
}
function getResVar(tid) {
	return 'v_' + tid;
}

function OnMultiThreadManagerStart(scriptCmdData) {
	DOpus.ClearOutput();
	var maxcount= scriptCmdData.func.args.MAXCOUNT;
	var cmd		= scriptCmdData.func.args.COMMAND;
	var maxwait	= scriptCmdData.func.args.MAXWAIT;

	if (!maxwait) {
		// if no max wait given use something else
		maxwait = 60*60*1000; // 1 hour in millisecs
	}
	DOpus.Output('Thread count: ' + maxcount + ', maxwait: ' + maxwait + ', command: ' + cmd);

	var maxwait_for_unfinished = maxwait; // make a param if you like

	var progress_bar = scriptCmdData.func.command.Progress;
    progress_bar.pause = true;
    progress_bar.abort = true;
    progress_bar.Init(scriptCmdData.func.sourcetab, 'Please wait'); 			// window title
    progress_bar.SetStatus('Running threads'); 	// header
	progress_bar.Show();
	progress_bar.SetFiles(scriptCmdData.func.sourcetab.selected_files.count);
	progress_bar.Restart();


	util.sv.Set('TP') = DOpus.Create.Map();;	// clear
	var tp = util.sv.Get('TP');

	// runaway stoppers for while loops
	var itermax = 1000;
	var itercnt = 0;


	var prefix = util.dopusrt + cmd;
	var current_count = 0;
	var selected_files_cnt = scriptCmdData.func.sourcetab.selstats.selfiles;
	fileloop: for (var eSelected = new Enumerator(scriptCmdData.func.sourcetab.selected), cnt = 1; !eSelected.atEnd(); eSelected.moveNext(), cnt++) {
		var selitem		= eSelected.item();
		var threadID	= getThreadID(getTS());
		var resvar		= getResVar(threadID);
		var prefix		= util.dopusrt + ' MultiThreadWorker THREADID="'+threadID+'" MAXWAIT='+maxwait+' CMD="'+cmd+'"';
		var torun		= prefix + ' FILE="' + selitem.realpath + '"';

		DOpus.Output('*************** MANAGER: ' + prefix + ', file: ' + selitem.name);
		current_count++;
		DOpus.Output('*************** Running #: ' + current_count);
		DOpus.Output('');
		DOpus.Output('');
		while(current_count > maxcount && ++itercnt < itermax) {
			DOpus.Delay(500);
			DOpus.Output("\ttoo many threads, waiting...: " + current_count + ' (iter:'+itercnt+')');
			var current_count = 0;
			for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
				var thread = eTP.item();
				if (!tp(thread)('finished')) {
					DOpus.Output('Unfinished file: ' + tp(thread)('file'));
					current_count++;
				}
			}
			DOpus.Output("\t...still running..: " + current_count);
		}

		new_thread				= DOpus.Create.Map();
		new_thread('resvar')	= resvar;
		new_thread('cmd')		= cmd;
		new_thread('maxwait')	= maxwait;
		new_thread('file')		= selitem.realpath;
		new_thread('finished')	= false;
		new_thread('maxwait')	= maxwait;

		tp(threadID) = new_thread;
		util.sv.Set('TP') = tp;

		progress_bar.StepFiles(1);
		progress_bar.SetTitle(cnt + '/' + selected_files_cnt);
		progress_bar.SetName(selitem.name);
		progress_bar.SetType('file');
		switch (progress_bar.GetAbortState()) {
			case 'a':
				break fileloop;
			case 'p':
				while (progress_bar.GetAbortState() !== '') { DOpus.Delay(200); if (progress_bar.GetAbortState() === 'a') break fileloop; }
				break;
		}

		DOpus.Output('*************** Starting new thread after availability... ' + selitem.name + '\n\n');
		util.cmdGlobal.RunCommand(torun);
		// uncomment this block only to test overall CPU load and ensure that it's approaching 100%
		// the results are irrelevant
		// calculate multiple hashes just to keep the CPU busy for a while
		/*
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
		*/

		DOpus.Output('');
		DOpus.Output('');
	}

	var ts = getTS()
	var all_finished = false;
	itercnt = 0;
	unfinished: while(!all_finished && ++itercnt < itermax && getTS() - ts <= maxwait_for_unfinished) {
		DOpus.Delay(500);
		all_finished = true;
		for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
			var thread = eTP.item();
			if (!tp(thread)('finished')) {
				DOpus.Output('...waiting for unfinished file: ' + tp(thread)('file'));
				all_finished = false;
			}
			switch (progress_bar.GetAbortState()) {
				case 'a':
					break unfinished;
				case 'p':
					while (progress_bar.GetAbortState() !== '') { DOpus.Delay(200); if (progress_bar.GetAbortState() === 'a') break unfinished; }
					break;
			}
		}
	}

	progress_bar.ClearAbortState();
	progress_bar.Hide();

	//
	//
	// FROM THIS POINT ON, DO WHAT YOU WANT...
	//
	//
	//
	// Unfortunately any thread still running after this point will be unreachable
	//
	// Summary
	DOpus.Output('');
	DOpus.Output('');
	DOpus.Output('');
	DOpus.Output('*****************  SUMMARY');
	DOpus.Output('');
	DOpus.Output('');
	DOpus.Output('');
	for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
		var thread = eTP.item();
		var rv = tp(thread)('resvar') + '';
		var result = util.sv.Get(rv);
		DOpus.Output('file: ' + tp(thread)('file') + ', resvar: ' + rv + ', finished: ' + tp(thread)('finished') + ', result: ' + result);
	}
}


function OnMultiThreadWorker(scriptCmdData) {
	var cmd			= scriptCmdData.func.args.CMD;
	var threadID	= scriptCmdData.func.args.THREADID;
	var maxwait		= scriptCmdData.func.args.MAXWAIT;
	var file		= scriptCmdData.func.args.FILE;
	DOpus.Output('\tWorker - threadID: ' + threadID + ', maxwait: ' + maxwait + ', cmd: ' + cmd + ', file: ' + file);

	var resvar = getResVar(threadID);
	var torun = cmd + ' RESVAR=' + resvar +' FILE="' + file + '"';

	// DOpus.Output('\t\tOnMultiThreadWorker(): maxwait: ' + maxwait + ' ' + torun);

	util.sv.Set(resvar) = false;
	util.cmdGlobal.RunCommand(torun);

	var ts1	= getTS();
	while (!util.sv.Get(resvar) && getTS() - maxwait < ts1 ) {
		DOpus.Delay(100);
	}
	util.sv.Set(resvar) = util.sv.Get(resvar) || false; // put the result back to memory
	util.sv.Get('TP')(threadID)('finished') = true;	// mark the thread as finished

	var ts2 = getTS();
	DOpus.Output('\tWorker - threadID: ' + threadID + ', elapsed: ' + Math.floor((ts2-ts1)/1000) + 's, result: ' + util.sv.Get(resvar) + '\t\t' + util.sv.Get('TP')(threadID)('finished'));
}


function OnCalcSHA256(scriptCmdData) {
	var ts1		= getTS();

	var resvar	= scriptCmdData.func.args.RESVAR;
	if (!resvar) {
		DOpus.Output('\t\tOnCalcSHA256: Cannot continue without a resvar: ' + resvar);
		return;
	}

	var item	= DOpus.FSUtil.GetItem(scriptCmdData.func.args.FILE);
	var hash	= false;
	DOpus.Output('\t\tOnCalcSHA256: ' + item.name + ', started @' + ts1);
	try {
		if (item.is_dir) return;
		hash = DOpus.FSUtil().Hash(item, 'sha256');
	} catch (e) {
		DOpus.Output('Error: ' + e.toString());
	}

	var ts2 = getTS();
	DOpus.Output('\t\tOnCalcSHA256: ' + item.name + ', finished @' + ts2);

	util.sv.Set(resvar) = hash;
	// return hash; // this wouldn't work as you expected
}
2 Likes

Neat way of achieving multiple threads!

Did it end up faster for SHA calculations? I'd intuitively expect them to be more bound by storage speed than CPU time, but may be wrong; I was surprised when I found out calculating more than one hash for the same file had a significant speed impact, so it may be more CPU bound than I thought.

You are most likely right. I haven't tested overall speed in multi-threaded vs single-threaded, I was more focused on maximizing the CPU usage, but for what it is worth, when I test it with my old spin-HDD, the CPU is definitely not the bottleneck and no matter how many threads I start CPU never gets above 4-5%. That's where your intuition is completely right. When I run it with an NVME or better yet a RamDisk, the CPU quickly gets much better utilized, ~15-20%. If I artificially increase the load (see the block with multiple 'util.cmdGlobal.RunCommand(torun)') I can make the script go bananas, up to 95%. I was just curious how far this can be taken.

Sidenote: There's a very neat program I use, ExactFile, which does exactly what I need for multi-threaded hashing, but it's not maintained anymore and has few nasty quirks, such as skipping attrib=system folders completely. RapidCRC is another one of the few which can do MT hashing but I couldn't warm up to it and it's also not maintained. They utilize CPU anywhere from 20 to 50% when I use SDD and alike, but drop down to 5-10% when I use an HDD, basically exactly as above as you suspected. I started developing a MT hasher of my own in Rust, but its GUI capabilities are still very limited. Now I might use (or abuse :smiley:) DOpus for the interface & file management and do the hashing in background with Rust, and put the GUI on top of it later. In fact I could take any ST hasher and make it MT.
Back to DOpus-land.

But there was an even more important thing which was bugging me. Sometimes I run an external program for multiple files, say 200 times a png optimizer or ffmpeg converter, and I want it to run as quick as possible but without opening 200 external windows at once nor waiting for 200x single thread time, nor dividing the file selection into 4-5 chunks and starting them in ST mode. Finally I can limit the number and still run it in parallel, a simple wrapper is all it takes. This alone made the effort worth it.

@Leo
To answer your question, I did a very quick, rough benchmark; 4 files of total 15 GB on an AMD 3900x

RamDrive:
ST: ~57.8s
MT: ~22.3s
ST (2nd run right after MT): ~57.9s

NVME:
ST: ~63.2s
MT: ~22.4s
ST (2nd run right after MT): ~58.1s

So MT greatly increases hashing speed as long as you use a non-spinning drive. If I had used more files, it probably would have helped even more; the CPU reached 18% max and quickly dropped as soon as some files were finished.

EDIT:
I made further tests.

The number 18% seems quite reasonable to me, on a 24 HT-core machine, the estimated CPU usage per core would be 100/24 ≈4.2% if each core would process 1 file exclusively, and since I used 4 files, 18% sounds ok.

Also note the difference between ≈60s in ST mode vs ≈22s in MT mode. The expected MT runtime is not as simple as dividing 60/4 = 15s and asking if the difference is the MT overhead. The diff comes not only from MT overhead but also depends on the longest running tasks, because the MT manager must wait at the very least for the longest running task (the biggest 5.7 GB file to hash in this case) even if it were running for 1 file.

To verify this, I copied a single file 24 times with sequential names, a total 31GB:

RamDisk:

  • MT: ≈9.5s(!) - Peak CPU: 86.9%.
  • ST: ≈125s(!) - Peak CPU: 5%s

NVME:

  • MT: ≈20s - Peak CPU: 29.3%
  • ST: not measured

Basically no file requires longer than the other. Note how the MT runtime immensely drops from 22.3s to 9.5s on RamDisk and to 20s on NVME despite twice the total size of initial benchmark. Also note the CPU usage differences between the two: On a RamDisk, disk speed is not the bottleneck and all cores are equally utilized almost to the max, but even on a very fast NVME, the CPU stops being the bottleneck and the disk read speed starts holding the CPU back.

==> Multi-threading helps immensely, regardless of the MT overhead or disk speed, as long as it not a classical HDD.

2 Likes

Found this old thread and decided to try and convert the script to work as a custom column.

It's pretty janky but I think technically it works, though in this case it doesn't seem to be any faster than the synchronous hashing column. Though for other custom columns maybe it will be better.

I still wouldn't really use it since it still has the problem of not being able to stop apparently if it starts taking too long lol. But figured I'd post it in case someone wants to work on it further.

Because of the way script columns work (at least from my understanding), I had to create some script variables that the parallel part starts putting the results into. Then what happens is in OnColumns it starts those parallel threads going, then just keeps looping on the first item until it sees that everything is done, then it lets Opus continue going through each file. Then each file has to look up its result in the results map. Might be another more efficient way to do it.

// MultiThread Test
// (c) 2021 cu
var util = {};
util.cmdGlobal	= DOpus.Create.Command;
util.sv			= Script.vars;
util.dopusrt	= 'dopusrt /acmd ';


/*
	Proof of Concept - Multi-Threaded Commands

	To test this, create a new button as such:

	@nodeselect
	MultiThreadManagerStart MAXCOUNT=8 MAXWAIT=5000 COMMAND "CalcSHA256"

	Basically the command which needs to be run in parallel, "CalcSHA256" in this case,
	must at least have a parameter called
		RESVAR (e.g. cmd.template='RESVAR/K, ...')
	and must set it before returning, e.g.
		Script.vars.Set(resvar) = calculated_hash;
	and the Thread Manager and Thread Workers will take care of the rest.

	The reason why RESVAR is necessary is that there is no possibility for Script Commands
	to directly return a value with standard JS, i.e. 'return myval;' ...that doesn't work.

	And since we run basically everything via 'dopusrt /acmd' anyway,
	the target command "CalcSHA256" is run completely asynchronously in a thread
	from which we would have no possibility to receive the return value.
	Now you know.
*/

// Called by Directory Opus to initialize the script
function OnInit(initData)
{
	initData.name = "Test Hash Parallel";
	initData.version = "1.0";
	initData.copyright = "(c) 2021 cuneytyilmaz.com";
	initData.desc = "";
	initData.default_enable = true;
	initData.min_version = "12.0";

	var cmd         = initData.AddCommand();
	cmd.name        = 'MultiThreadManagerStart';
	cmd.method      = 'OnMultiThreadManagerStart';
	cmd.template    = 'MAXCOUNT/N,MAXWAIT/N,COMMAND/K';
	cmd.label		= 'Start Multi Threaded Command';
	cmd.desc        = 'not yet';

	var cmd         = initData.AddCommand();
	cmd.name        = 'MultiThreadWorker';
	cmd.method      = 'OnMultiThreadWorker';
	cmd.template    = 'THREADID/K,MAXWAIT/N,CMD/K,FILE/K';
	cmd.label		= 'Start Multi Threaded Command';
	cmd.desc        = 'not yet';

	var cmd         = initData.AddCommand();
	cmd.name        = 'CalcSHA256';
	cmd.method      = 'OnCalcSHA256';
	cmd.template    = 'RESVAR/K,FILE/K';
	cmd.label		= 'Calc SHA-256';
	cmd.desc        = 'not yet';
	
}

// 2
function OnAddColumns(addColData)
{

    resultData = AddColumn(addColData, "TestHashParallel", "Test Hash Parallel", "testhashparallel");
}

// 3
function AddColumn(addColData, colName, colLabel, checkType)
{
    var col = addColData.AddColumn();
    col.name = colName;
    col.label = colLabel;
    col.header = 'Test Hash Parallel';
    col.method = "OnColumns";
    col.multicol = false;
    col.autogroup = true;
    col.autorefresh = true;
    col.userdata = checkType;
    col.namerefresh = true; // Refresh the name after each change
	col.justify = "left";

	Script.vars.Set("maxcount", 8)
	Script.vars.Set("maxwait", 150)
	Script.vars.Set("cmd", "CalcSHA256")
	Script.vars.Set('resultsMap', DOpus.Create.Map());
	Script.vars.Set('processedCount', 0);
	Script.vars.Set('totalCount', 0);

	//DOpus.Output("---------------- Random " + Math.floor(100 + Math.random() * 899)) + " ------------------------";

	Script.vars.Set("allItems", DOpus.Create.Vector());
	//DOpus.Output("Exists: " + Script.vars.exists("allItems"));
	//DOpus.Output("Here I am running! all items = " + Script.vars.Get('allItems'));
}

//----------------------------------------------------------------------------------------------------------


function OnColumns(scriptColData)
{
	var maxcount= Script.vars.Get('maxcount');
	var cmd	= Script.vars.Get('cmd');
	var maxwait	= Script.vars.Get('maxwait');

	if (Script.vars.Exists('allItems')) {
		var allItems = Script.vars.Get('allItems');
	} else {
		Script.vars.Set("allItems", DOpus.Create.Vector());
	}

	
	// Only get all items if this is the first item being run
	if (allItems.empty) {
		//DOpus.Output("allItems appears empty");
		util.sv.Set('allItems', FirstGetAllTabItems(scriptColData.tab));

		//MultiThreadManagerStart MAXCOUNT=8 MAXWAIT=5000 COMMAND "CalcSHA256"
		var torun = util.dopusrt + ' MultiThreadManagerStart MAXCOUNT="'+maxcount+'" MAXWAIT='+maxwait+' COMMAND="'+cmd+'"';
		//DOpus.Output("To Run: " + torun);
		util.cmdGlobal.RunCommand(torun);
	}
	
	var itemPath = scriptColData.item.realpath;

	// Wait for the result to be available
    var tsStart = getTS();
	fullWait=7000;
    //while (!Script.vars.Get('resultsMap').exists(itemPath) && (getTS() - tsStart) < fullWait) {
	while (!Script.vars.Exists('resultsMap') || (!Script.vars.Get('resultsMap').exists(itemPath) && (getTS() - tsStart) < fullWait)) {
        DOpus.Delay(50); // Wait for 100 milliseconds before checking again
		//DOpus.Output("Waiting...");
		//DOpus.Output("Result Count: " + Script.vars.Get('resultsMap').count);
    }

	
    if (Script.vars.Get('resultsMap').exists(itemPath)) {
        scriptColData.value = Script.vars.Get('resultsMap')(itemPath);
		Script.vars.set('processedCount', Script.vars.Get('processedCount')+1);
    } else {
        scriptColData.value = "[Took Too Long]";
    }

	if (Script.vars.Get('processedCount') === Script.vars.Get('totalCount')) {
		Script.vars.Delete('allItems');
	}
}

//----------------------------------------------------------------------------------------------------------

function FirstGetAllTabItems(currentTab) {
    var allItemsLocal = DOpus.Create.Vector();
    var folderEnum = new Enumerator(currentTab.files); // Enumerate all items in the active tab
    for (; !folderEnum.atEnd(); folderEnum.moveNext()) {
        var item = folderEnum.item();
        //DOpus.Output("Adding Item: " + item.realpath);
        allItemsLocal.push_back(item); // Add the item to the Vector
    }
	//DOpus.Output("allItemsLocal Count: " + allItemsLocal.count);
	Script.vars.Set('totalCount', allItemsLocal.count);
    return allItemsLocal;
}

//----------------------------------------------------------------------------------------------------------

function getTS() {
	return new Date().getTime();
}
function getThreadID(ts) {
	return 't_' + ts + '_' + Math.floor(100 + Math.random() * 899);
}
function getResVar(tid) {
	return 'v_' + tid;
}

function OnMultiThreadManagerStart(scriptCmdData) {
	DOpus.ClearOutput();
	var maxcount= scriptCmdData.func.args.MAXCOUNT;
	var cmd		= scriptCmdData.func.args.COMMAND;
	var maxwait	= scriptCmdData.func.args.MAXWAIT;

	if (!maxwait) {
		// if no max wait given use something else
		maxwait = 60*60*1000; // 1 hour in millisecs
	}
	//DOpus.Output('Thread count: ' + maxcount + ', maxwait: ' + maxwait + ', command: ' + cmd);

	var maxwait_for_unfinished = maxwait; // make a param if you like

	//var progress_bar = scriptCmdData.func.command.Progress;
    //progress_bar.pause = true;
    //progress_bar.abort = true;
    //progress_bar.Init(scriptCmdData.func.sourcetab, 'Please wait'); 			// window title
    //progress_bar.SetStatus('Running threads'); 	// header
	//progress_bar.Show();
	//progress_bar.SetFiles(scriptCmdData.func.sourcetab.selected_files.count);
	//progress_bar.Restart();


	util.sv.Set('TP') = DOpus.Create.Map();;	// clear
	var tp = util.sv.Get('TP');

	// runaway stoppers for while loops
	var itermax = 1000;
	var itercnt = 0;


	var prefix = util.dopusrt + cmd;
	var current_count = 0;
	//var selected_files_cnt = scriptCmdData.func.sourcetab.selstats.selfiles;
	var selected_files_cnt = Script.vars.Get('allItems').count;
	//fileloop: for (var eSelected = new Enumerator(scriptCmdData.func.sourcetab.selected), cnt = 1; !eSelected.atEnd(); eSelected.moveNext(), cnt++) {
	fileloop: for (var eSelected = new Enumerator(Script.vars.Get('allItems')), cnt = 1; !eSelected.atEnd(); eSelected.moveNext(), cnt++) {	
		var selitem		= eSelected.item();
		var threadID	= getThreadID(getTS());
		var resvar		= getResVar(threadID);
		var prefix		= util.dopusrt + ' MultiThreadWorker THREADID="'+threadID+'" MAXWAIT='+maxwait+' CMD="'+cmd+'"';
		var torun		= prefix + ' FILE="' + selitem.realpath + '"';

		//DOpus.Output('*************** MANAGER: ' + prefix + ', file: ' + selitem.name);
		current_count++;
		//DOpus.Output('*************** Running #: ' + current_count);
		//DOpus.Output('');
		//DOpus.Output('');
		while(current_count > maxcount && ++itercnt < itermax) {
			DOpus.Delay(500);
			//DOpus.Output("\ttoo many threads, waiting...: " + current_count + ' (iter:'+itercnt+')');
			var current_count = 0;
			for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
				var thread = eTP.item();
				if (!tp(thread)('finished')) {
					//DOpus.Output('Unfinished file: ' + tp(thread)('file'));
					current_count++;
				}
			}
			//DOpus.Output("\t...still running..: " + current_count);
		}

		new_thread				= DOpus.Create.Map();
		new_thread('resvar')	= resvar;
		new_thread('cmd')		= cmd;
		new_thread('maxwait')	= maxwait;
		new_thread('file')		= selitem.realpath;
		new_thread('finished')	= false;
		new_thread('maxwait')	= maxwait;

		tp(threadID) = new_thread;
		util.sv.Set('TP') = tp;

		//progress_bar.StepFiles(1);
		//progress_bar.SetTitle(cnt + '/' + selected_files_cnt);
		//progress_bar.SetName(selitem.name);
		//progress_bar.SetType('file');
		//switch (progress_bar.GetAbortState()) {
		//	case 'a':
		//		break fileloop;
		//	case 'p':
		//		while (progress_bar.GetAbortState() !== '') { DOpus.Delay(200); if (progress_bar.GetAbortState() === 'a') break fileloop; }
		//		break;
		//}

		//DOpus.Output('*************** Starting new thread after availability... ' + selitem.name + '\n\n');
		util.cmdGlobal.RunCommand(torun);
		// uncomment this block only to test overall CPU load and ensure that it's approaching 100%
		// the results are irrelevant
		// calculate multiple hashes just to keep the CPU busy for a while
		/*
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
		*/

		//DOpus.Output('');
		//DOpus.Output('');
	}

	var ts = getTS()
	var all_finished = false;
	itercnt = 0;
	unfinished: while(!all_finished && ++itercnt < itermax && getTS() - ts <= maxwait_for_unfinished) {
		DOpus.Delay(500);
		all_finished = true;
		for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
			var thread = eTP.item();
			if (!tp(thread)('finished')) {
				//DOpus.Output('...waiting for unfinished file: ' + tp(thread)('file'));
				all_finished = false;
			}
			//switch (progress_bar.GetAbortState()) {
			//	case 'a':
			//		break unfinished;
			//	case 'p':
			//		while (progress_bar.GetAbortState() !== '') { DOpus.Delay(200); if (progress_bar.GetAbortState() === 'a') break unfinished; }
			//		break;
			//}
		}
	}

	//progress_bar.ClearAbortState();
	//progress_bar.Hide();

	//
	//
	// FROM THIS POINT ON, DO WHAT YOU WANT...
	//
	//
	//
	// Unfortunately any thread still running after this point will be unreachable
	//
	// Summary
	//DOpus.Output('');
	//DOpus.Output('');
	//DOpus.Output('');
	//DOpus.Output('*****************  SUMMARY');
	//DOpus.Output('');
	//DOpus.Output('');
	//DOpus.Output('');
	for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
		var thread = eTP.item();
		var rv = tp(thread)('resvar') + '';
		var result = util.sv.Get(rv);
		//resultsMap(tp(thread)('file')) = result;
		var newResultsMap = Script.vars.Get('resultsMap');
		newResultsMap.Set(tp(thread)('file'), result);
		Script.vars.Set('resultsMap', newResultsMap); // Update the script variable
		
		//DOpus.Output('file: ' + tp(thread)('file') + ', resvar: ' + rv + ', finished: ' + tp(thread)('finished') + ', result: ' + result);
	}
}


function OnMultiThreadWorker(scriptCmdData) {
	var cmd			= scriptCmdData.func.args.CMD;
	var threadID	= scriptCmdData.func.args.THREADID;
	var maxwait		= scriptCmdData.func.args.MAXWAIT;
	var file		= scriptCmdData.func.args.FILE;
	//DOpus.Output('\tWorker - threadID: ' + threadID + ', maxwait: ' + maxwait + ', cmd: ' + cmd + ', file: ' + file);

	var resvar = getResVar(threadID);
	var torun = cmd + ' RESVAR=' + resvar +' FILE="' + file + '"';

	// DOpus.Output('\t\tOnMultiThreadWorker(): maxwait: ' + maxwait + ' ' + torun);

	util.sv.Set(resvar) = false;
	util.cmdGlobal.RunCommand(torun);

	var ts1	= getTS();
	while (!util.sv.Get(resvar) && getTS() - maxwait < ts1 ) {
		DOpus.Delay(100);
	}
	util.sv.Set(resvar) = util.sv.Get(resvar) || false; // put the result back to memory
	util.sv.Get('TP')(threadID)('finished') = true;	// mark the thread as finished

	var ts2 = getTS();
	//DOpus.Output('\tWorker - threadID: ' + threadID + ', elapsed: ' + Math.floor((ts2-ts1)/1000) + 's, result: ' + util.sv.Get(resvar) + '\t\t' + util.sv.Get('TP')(threadID)('finished'));
}


function OnCalcSHA256(scriptCmdData) {
	var ts1		= getTS();

	var resvar	= scriptCmdData.func.args.RESVAR;
	if (!resvar) {
		//DOpus.Output('\t\tOnCalcSHA256: Cannot continue without a resvar: ' + resvar);
		return;
	}

	var item	= DOpus.FSUtil.GetItem(scriptCmdData.func.args.FILE);
	var hash	= false;
	//DOpus.Output('\t\tOnCalcSHA256: ' + item.name + ', started @' + ts1);
	try {
		if (item.is_dir) return;
		hash = DOpus.FSUtil().Hash(item, 'sha256');
	} catch (e) {
		DOpus.Output('Error: ' + e.toString());
	}

	var ts2 = getTS();
	//DOpus.Output('\t\tOnCalcSHA256: ' + item.name + ', finished @' + ts2);

	util.sv.Set(resvar) = hash;
	// return hash; // this wouldn't work as you expected
}
1 Like

There's a repository which I haven't unannounced, or updated from my local copy in a long while, on my Github. It does knapsacking and communicating with main thread, too, exactly what you're describing here, i.e. distributing the threads as quickly & efficiently as possible. And it has from premade buttons to to progress tracking, verifying, exporting, importing, smart-updating, dirty-detection, setting number of threads, and a ton of other utility features and functions. I have recently even implemented a duplicate checker and smart directory comparison, which is extremely powerful; basically it syncs 2 directories just as DOpus sync, but also auto-detects renamed or moved files as well and simply renames/moves them on destination. Saves a lot of copying time if you rename/move big files to/from a slow external disk for example.

However, there is indeed an inherent problem as you correctly recognized: The script allows interrupting a multi-threaded operation only after all the currently running threads finish, i.e. if you're verifying 20 small files and 1 big 20 GB file, the whole operation aborts only when the big file is finished. Regardless, on an m.2 with a decent CPU, you rarely need to wait long. And for old-school spin-drives, a. I use 1-thread only to avoid disk-thrashing, b. rarely run hash/verify on the disk while I'm at PC and just run it over night or go for groceries, etc.

This script has been a major part of my daily workflow, and been constantly tweaking it. Check it out, and let me know if you're interested, because there is no enduser-friendly setup instructions but I can help and can commit my local copy soon...ish.

Wow an enormous amount of work was clearly put into that.

Not exactly related but since you care a lot about data integrity, figured I'd also mention parchive files if you weren't already aware. Basically you create 'parity' files (par2 files) using one of any number of par2 tools, that can repair damaged files. A different use case than your script so not at all trying to suggest it's a replacement.

But I imagine if you have a set of files you know are going to be static like an archive, you could create the parity files, and when later using your script if you detect any corruption you could use the parity files to repair them.

1 Like

RAR is great when comes to recovery data.

1 Like