Lab 7 - Example 3 : Multiple File Sort Using Supervisor Worker
Download code and large files that need to sorted
Tuple Space interface
//The TupleSpace interface public interface TupleSpace{ // deposits data in tuple space public void out (String tag, Object data); // extracts object with tag from tuple space, blocks if not available public Object in (String tag) throws InterruptedException; // reads object with tag from tuple space, blocks if not available public Object rd (String tag) throws InterruptedException; // extracts object if available, return null if not available public Object inp (String tag); //reads object if available, return null if not available public Object rdp (String tag); }
Tuple Space Implementation
import java.util.*; public class TupleSpaceImpl implements TupleSpace { private Hashtable tuples = new Hashtable(); //Hashtable of Vectors // deposits data in tuple space public synchronized void out (String tag, Object data) { Vector v = (Vector) tuples.get(tag); if (v==null) { v=new Vector(); tuples.put(tag,v); } v.addElement(data); notifyAll(); } private Object get(String tag, boolean remove) { Vector v = (Vector) tuples.get(tag); if (v==null) return null; if (v.size()==0) return null; Object o = v.firstElement(); if (remove) v.removeElementAt(0); return o; } // extracts object with tag from tuple space, blocks if not available public synchronized Object in (String tag) throws InterruptedException { Object o; while ((o=get(tag,true))==null) wait(); return o; } // reads object with tag from tuple space, blocks if not available public synchronized Object rd (String tag) throws InterruptedException { Object o; while ((o=get(tag,false))==null) wait(); return o; } // extracts object if available, return null if not available public synchronized Object inp (String tag) { return get(tag,true); } //reads object if available, return null if not available public synchronized Object rdp (String tag) { return get(tag,false); } }
Supervisor
import java.io.*; import java.util.*; /** * Reads one or more files to be sorted and puts their contents into the * tuple space and then gets the results (the sorted lists) from the tuple space * @see TupleSpaceImpl * @see TupleSpace */ public class FileSortSupervisor extends Thread{ private TupleSpace ts = null; private String files[]; public FileSortSupervisor(TupleSpace _ts, String _files[]){ ts = _ts; files = _files; } public void run(){ //for every file, read all lines and put them into tuple space //tuple space can be any object and the tag for it try{ for(String file: files){ FileReader fr = new FileReader(file); BufferedReader br = new BufferedReader(fr); List<String> lines = new ArrayList<String>(); String line = ""; while((line = br.readLine()) != null) lines.add(line); //put the lines into the TupleSpace ts.out("task", lines); } }catch(IOException ioex){ ioex.printStackTrace(); } //create a list to which the results from worker will be added List list = new ArrayList(); //get the results from the workers for(int i = 0; i < files.length; i++){ Object result = null; try{ result = ts.in("result"); }catch(InterruptedException iex){ //ignore } //the result will be a list so cast it to a list List sortedList = (List)result; //add the sortedList to the list list.addAll(sortedList); } //Write the sorted out put to a file try{ FileWriter fw = new FileWriter("sorted.txt"); String s = ""; for(Object obj : list){ s = (String) obj; fw.write(s +"\n"); } fw.close(); }catch(IOException ioex){ ioex.printStackTrace(); } //put "stop" tuples into the tuple space so that workers can stop //the number of "stop" tuples must be same as the number of workers //otherwise workers will hang and never finish ts.out("task","stop"); ts.out("task","stop"); } }
Worker
import java.util.*; /** * The implementation of worker follows the discussion * from the lecture - algorithm for worker */ public class SortWorker extends Thread{ private TupleSpace ts = null; public SortWorker(TupleSpace _ts){ ts = _ts; } public void run(){ while(true){ //take a task from the Tuple space Object task = null; try{ task = ts.in("task"); }catch(InterruptedException iex){} //check the type of task using instanceof if(task instanceof String){ String s = (String) task; //if the task is the "stop" tuple - stop this worker thread if(s.equals("stop")) break; } //task is a list so cast it to a list List list = (List) task; //do some work - sort the list Collections.sort(list); //"out" the result (the list after sorting) into the tuple space ts.out("result", list); } } }
Main program which starts the supervisor and workers and initiliazes the TupleSpace
/** * The program which starts the Supervisor and workers. * It also creates a tuple space which will be used by both the supervisors * and workers. * The example here uses many files which are not sorted and have large number * of lines(5-10K lines per file). The supervisor puts the contents of each file * (as a list) into the tuple space. A worker takes the unsorted list from the * tuple space and sorts it and puts the sorted list (the result) into the tuple * space. The supervisor takes the result from all workers and merges them into * a single list and writes them to a file (see code for FileSortSupervisor) * Lastly the supervisor puts the "stop" tuple into the tuple space so that the * workers can stop running. * @see FileSortSupervisor * @see SortWorker */ public class FileSortSuperWorker{ public static void main(String args[]){ if(args.length == 0){ System.out.println("No files to process"); System.out.println("Usage:java FileSortSuperWorker file1 file2 .."); System.exit(1); } String files[] = args; //these are names of the files to be sorted //The Tuple space which is the Connector between supervisor and workers TupleSpace ts = new TupleSpaceImpl(); //create a supervisor and pass it all files which need to sorted and //merged after sorting by workers Thread fss = new FileSortSupervisor(ts, files); fss.start(); //create worker threads and pass to them the reference of tuple space Thread sortWorker1 = new SortWorker(ts); Thread sortWorker2 = new SortWorker(ts); //start the workers sortWorker1.start(); sortWorker2.start(); } }
page revision: 5, last edited: 16 Apr 2011 11:22