How to run a script command in parallel (multi-threading)

@Leo gave me a tip here while I was asking in passing by how I could run a script command once for each selected file individually instead of once for all files. This is no problem if you execute an external command and use @async, which doesn't work for commands or inline scripts, incl. "rename * to * @script..." trick. And you cannot limit the number of parallel threads with external progs anyway.

And I was looking at Leo's SHA script today. Apart from the 512MB bug they mention, I still liked the idea of having some checksum columns, but became quickly sober again after seeing how it runs in "single thread", 1 file at a time.

Well, we have so many CPU cores and hyperthreading now, so why not use 100% of CPU ? So put the 2 above together, I quickly concocted a multi-threaded command wrapper, with MAXCOUNT/N,MAXWAIT/N,COMMAND/K. Now you can select a bunch of files, set the MAXCOUNT to X, start hashing in parallel. It would easily work for some other user commands as well. It's an ugly hack but it works, in fact I'm planning to use this for quite a lot of ideas, like putting hash values in my recent favorite ADS.

There are 2 caveats though:

  • The called command must receive a "ResVar" parameter and set it back in Script.Vars, so that the thread manager can receive it.
  • If a thread still is running after a timeout is reached, it cannot be killed or received values from; it will continue running in background. So the command you call must make sure it's not running forever.

This is just a proof of concept but still a hack. I bet you'd have some great ideas for improvements.

// MultiThread Test
// (c) 2021 cu
var util = {};
util.cmdGlobal	= DOpus.Create.Command;
util.sv			= Script.vars;
util.dopusrt	= 'dopusrt /acmd ';
/*
	Proof of Concept - Multi-Threaded Commands

	To test this, create a new button as such:

	@nodeselect
	MultiThreadManagerStart MAXCOUNT=8 MAXWAIT=5000 COMMAND "CalcSHA256"

	Basically the command which needs to be run in parallel, "CalcSHA256" in this case,
	must at least have a parameter called
		RESVAR (e.g. cmd.template='RESVAR/K, ...')
	and must set it before returning, e.g.
		Script.vars.Set(resvar) = calculated_hash;
	and the Thread Manager and Thread Workers will take care of the rest.

	The reason why RESVAR is necessary is that there is no possibility for Script Commands
	to directly return a value with standard JS, i.e. 'return myval;' ...that doesn't work.

	And since we run basically everything via 'dopusrt /acmd' anyway,
	the target command "CalcSHA256" is run completely asynchronously in a thread
	from which we would have no possibility to receive the return value.
	Now you know.
*/

// Called by Directory Opus to initialize the script
function OnInit(initData)
{
	initData.name = "MultiThread Test";
	initData.version = "1.0";
	initData.copyright = "(c) 2021 cuneytyilmaz.com";
	initData.desc = "";
	initData.default_enable = true;
	initData.min_version = "12.0";

	var cmd         = initData.AddCommand();
	cmd.name        = 'MultiThreadManagerStart';
	cmd.method      = 'OnMultiThreadManagerStart';
	cmd.template    = 'MAXCOUNT/N,MAXWAIT/N,COMMAND/K';
	cmd.label		= 'Start Multi Threaded Command';
	cmd.desc        = 'not yet';

	var cmd         = initData.AddCommand();
	cmd.name        = 'MultiThreadWorker';
	cmd.method      = 'OnMultiThreadWorker';
	cmd.template    = 'THREADID/K,MAXWAIT/N,CMD/K,FILE/K';
	cmd.label		= 'Start Multi Threaded Command';
	cmd.desc        = 'not yet';

	var cmd         = initData.AddCommand();
	cmd.name        = 'CalcSHA256';
	cmd.method      = 'OnCalcSHA256';
	cmd.template    = 'RESVAR/K,FILE/K';
	cmd.label		= 'Calc SHA-256';
	cmd.desc        = 'not yet';
}


function getTS() {
	return new Date().getTime();
}
function getThreadID(ts) {
	return 't_' + ts + '_' + Math.floor(100 + Math.random() * 899);
}
function getResVar(tid) {
	return 'v_' + tid;
}

function OnMultiThreadManagerStart(scriptCmdData) {
	DOpus.ClearOutput();
	var maxcount= scriptCmdData.func.args.MAXCOUNT;
	var cmd		= scriptCmdData.func.args.COMMAND;
	var maxwait	= scriptCmdData.func.args.MAXWAIT;

	if (!maxwait) {
		// if no max wait given use something else
		maxwait = 60*60*1000; // 1 hour in millisecs
	}
	DOpus.Output('Thread count: ' + maxcount + ', maxwait: ' + maxwait + ', command: ' + cmd);

	var maxwait_for_unfinished = maxwait; // make a param if you like

	var progress_bar = scriptCmdData.func.command.Progress;
    progress_bar.pause = true;
    progress_bar.abort = true;
    progress_bar.Init(scriptCmdData.func.sourcetab, 'Please wait'); 			// window title
    progress_bar.SetStatus('Running threads'); 	// header
	progress_bar.Show();
	progress_bar.SetFiles(scriptCmdData.func.sourcetab.selected_files.count);
	progress_bar.Restart();


	util.sv.Set('TP') = DOpus.Create.Map();;	// clear
	var tp = util.sv.Get('TP');

	// runaway stoppers for while loops
	var itermax = 1000;
	var itercnt = 0;


	var prefix = util.dopusrt + cmd;
	var current_count = 0;
	var selected_files_cnt = scriptCmdData.func.sourcetab.selstats.selfiles;
	fileloop: for (var eSelected = new Enumerator(scriptCmdData.func.sourcetab.selected), cnt = 1; !eSelected.atEnd(); eSelected.moveNext(), cnt++) {
		var selitem		= eSelected.item();
		var threadID	= getThreadID(getTS());
		var resvar		= getResVar(threadID);
		var prefix		= util.dopusrt + ' MultiThreadWorker THREADID="'+threadID+'" MAXWAIT='+maxwait+' CMD="'+cmd+'"';
		var torun		= prefix + ' FILE="' + selitem.realpath + '"';

		DOpus.Output('*************** MANAGER: ' + prefix + ', file: ' + selitem.name);
		current_count++;
		DOpus.Output('*************** Running #: ' + current_count);
		DOpus.Output('');
		DOpus.Output('');
		while(current_count > maxcount && ++itercnt < itermax) {
			DOpus.Delay(500);
			DOpus.Output("\ttoo many threads, waiting...: " + current_count + ' (iter:'+itercnt+')');
			var current_count = 0;
			for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
				var thread = eTP.item();
				if (!tp(thread)('finished')) {
					DOpus.Output('Unfinished file: ' + tp(thread)('file'));
					current_count++;
				}
			}
			DOpus.Output("\t...still running..: " + current_count);
		}

		new_thread				= DOpus.Create.Map();
		new_thread('resvar')	= resvar;
		new_thread('cmd')		= cmd;
		new_thread('maxwait')	= maxwait;
		new_thread('file')		= selitem.realpath;
		new_thread('finished')	= false;
		new_thread('maxwait')	= maxwait;

		tp(threadID) = new_thread;
		util.sv.Set('TP') = tp;

		progress_bar.StepFiles(1);
		progress_bar.SetTitle(cnt + '/' + selected_files_cnt);
		progress_bar.SetName(selitem.name);
		progress_bar.SetType('file');
		switch (progress_bar.GetAbortState()) {
			case 'a':
				break fileloop;
			case 'p':
				while (progress_bar.GetAbortState() !== '') { DOpus.Delay(200); if (progress_bar.GetAbortState() === 'a') break fileloop; }
				break;
		}

		DOpus.Output('*************** Starting new thread after availability... ' + selitem.name + '\n\n');
		util.cmdGlobal.RunCommand(torun);
		// uncomment this block only to test overall CPU load and ensure that it's approaching 100%
		// the results are irrelevant
		// calculate multiple hashes just to keep the CPU busy for a while
		/*
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
			util.cmdGlobal.RunCommand(torun);
		*/

		DOpus.Output('');
		DOpus.Output('');
	}

	var ts = getTS()
	var all_finished = false;
	itercnt = 0;
	unfinished: while(!all_finished && ++itercnt < itermax && getTS() - ts <= maxwait_for_unfinished) {
		DOpus.Delay(500);
		all_finished = true;
		for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
			var thread = eTP.item();
			if (!tp(thread)('finished')) {
				DOpus.Output('...waiting for unfinished file: ' + tp(thread)('file'));
				all_finished = false;
			}
			switch (progress_bar.GetAbortState()) {
				case 'a':
					break unfinished;
				case 'p':
					while (progress_bar.GetAbortState() !== '') { DOpus.Delay(200); if (progress_bar.GetAbortState() === 'a') break unfinished; }
					break;
			}
		}
	}

	progress_bar.ClearAbortState();
	progress_bar.Hide();

	//
	//
	// FROM THIS POINT ON, DO WHAT YOU WANT...
	//
	//
	//
	// Unfortunately any thread still running after this point will be unreachable
	//
	// Summary
	DOpus.Output('');
	DOpus.Output('');
	DOpus.Output('');
	DOpus.Output('*****************  SUMMARY');
	DOpus.Output('');
	DOpus.Output('');
	DOpus.Output('');
	for (var eTP = new Enumerator(tp); !eTP.atEnd(); eTP.moveNext()) {
		var thread = eTP.item();
		var rv = tp(thread)('resvar') + '';
		var result = util.sv.Get(rv);
		DOpus.Output('file: ' + tp(thread)('file') + ', resvar: ' + rv + ', finished: ' + tp(thread)('finished') + ', result: ' + result);
	}
}


function OnMultiThreadWorker(scriptCmdData) {
	var cmd			= scriptCmdData.func.args.CMD;
	var threadID	= scriptCmdData.func.args.THREADID;
	var maxwait		= scriptCmdData.func.args.MAXWAIT;
	var file		= scriptCmdData.func.args.FILE;
	DOpus.Output('\tWorker - threadID: ' + threadID + ', maxwait: ' + maxwait + ', cmd: ' + cmd + ', file: ' + file);

	var resvar = getResVar(threadID);
	var torun = cmd + ' RESVAR=' + resvar +' FILE="' + file + '"';

	// DOpus.Output('\t\tOnMultiThreadWorker(): maxwait: ' + maxwait + ' ' + torun);

	util.sv.Set(resvar) = false;
	util.cmdGlobal.RunCommand(torun);

	var ts1	= getTS();
	while (!util.sv.Get(resvar) && getTS() - maxwait < ts1 ) {
		DOpus.Delay(100);
	}
	util.sv.Set(resvar) = util.sv.Get(resvar) || false; // put the result back to memory
	util.sv.Get('TP')(threadID)('finished') = true;	// mark the thread as finished

	var ts2 = getTS();
	DOpus.Output('\tWorker - threadID: ' + threadID + ', elapsed: ' + Math.floor((ts2-ts1)/1000) + 's, result: ' + util.sv.Get(resvar) + '\t\t' + util.sv.Get('TP')(threadID)('finished'));
}


function OnCalcSHA256(scriptCmdData) {
	var ts1		= getTS();

	var resvar	= scriptCmdData.func.args.RESVAR;
	if (!resvar) {
		DOpus.Output('\t\tOnCalcSHA256: Cannot continue without a resvar: ' + resvar);
		return;
	}

	var item	= DOpus.FSUtil.GetItem(scriptCmdData.func.args.FILE);
	var hash	= false;
	DOpus.Output('\t\tOnCalcSHA256: ' + item.name + ', started @' + ts1);
	try {
		if (item.is_dir) return;
		hash = DOpus.FSUtil().Hash(item, 'sha256');
	} catch (e) {
		DOpus.Output('Error: ' + e.toString());
	}

	var ts2 = getTS();
	DOpus.Output('\t\tOnCalcSHA256: ' + item.name + ', finished @' + ts2);

	util.sv.Set(resvar) = hash;
	// return hash; // this wouldn't work as you expected
}
2 Likes

Neat way of achieving multiple threads!

Did it end up faster for SHA calculations? I'd intuitively expect them to be more bound by storage speed than CPU time, but may be wrong; I was surprised when I found out calculating more than one hash for the same file had a significant speed impact, so it may be more CPU bound than I thought.

You are most likely right. I haven't tested overall speed in multi-threaded vs single-threaded, I was more focused on maximizing the CPU usage, but for what it is worth, when I test it with my old spin-HDD, the CPU is definitely not the bottleneck and no matter how many threads I start CPU never gets above 4-5%. That's where your intuition is completely right. When I run it with an NVME or better yet a RamDisk, the CPU quickly gets much better utilized, ~15-20%. If I artificially increase the load (see the block with multiple 'util.cmdGlobal.RunCommand(torun)') I can make the script go bananas, up to 95%. I was just curious how far this can be taken.

Sidenote: There's a very neat program I use, ExactFile, which does exactly what I need for multi-threaded hashing, but it's not maintained anymore and has few nasty quirks, such as skipping attrib=system folders completely. RapidCRC is another one of the few which can do MT hashing but I couldn't warm up to it and it's also not maintained. They utilize CPU anywhere from 20 to 50% when I use SDD and alike, but drop down to 5-10% when I use an HDD, basically exactly as above as you suspected. I started developing a MT hasher of my own in Rust, but its GUI capabilities are still very limited. Now I might use (or abuse :smiley:) DOpus for the interface & file management and do the hashing in background with Rust, and put the GUI on top of it later. In fact I could take any ST hasher and make it MT.
Back to DOpus-land.

But there was an even more important thing which was bugging me. Sometimes I run an external program for multiple files, say 200 times a png optimizer or ffmpeg converter, and I want it to run as quick as possible but without opening 200 external windows at once nor waiting for 200x single thread time, nor dividing the file selection into 4-5 chunks and starting them in ST mode. Finally I can limit the number and still run it in parallel, a simple wrapper is all it takes. This alone made the effort worth it.

@Leo
To answer your question, I did a very quick, rough benchmark; 4 files of total 15 GB on an AMD 3900x

RamDrive:
ST: ~57.8s
MT: ~22.3s
ST (2nd run right after MT): ~57.9s

NVME:
ST: ~63.2s
MT: ~22.4s
ST (2nd run right after MT): ~58.1s

So MT greatly increases hashing speed as long as you use a non-spinning drive. If I had used more files, it probably would have helped even more; the CPU reached 18% max and quickly dropped as soon as some files were finished.

EDIT:
I made further tests.

The number 18% seems quite reasonable to me, on a 24 HT-core machine, the estimated CPU usage per core would be 100/24 ≈4.2% if each core would process 1 file exclusively, and since I used 4 files, 18% sounds ok.

Also note the difference between ≈60s in ST mode vs ≈22s in MT mode. The expected MT runtime is not as simple as dividing 60/4 = 15s and asking if the difference is the MT overhead. The diff comes not only from MT overhead but also depends on the longest running tasks, because the MT manager must wait at the very least for the longest running task (the biggest 5.7 GB file to hash in this case) even if it were running for 1 file.

To verify this, I copied a single file 24 times with sequential names, a total 31GB:

RamDisk:

  • MT: ≈9.5s(!) - Peak CPU: 86.9%.
  • ST: ≈125s(!) - Peak CPU: 5%s

NVME:

  • MT: ≈20s - Peak CPU: 29.3%
  • ST: not measured

Basically no file requires longer than the other. Note how the MT runtime immensely drops from 22.3s to 9.5s on RamDisk and to 20s on NVME despite twice the total size of initial benchmark. Also note the CPU usage differences between the two: On a RamDisk, disk speed is not the bottleneck and all cores are equally utilized almost to the max, but even on a very fast NVME, the CPU stops being the bottleneck and the disk read speed starts holding the CPU back.

==> Multi-threading helps immensely, regardless of the MT overhead or disk speed, as long as it not a classical HDD.

2 Likes