Commits

Matt Knepley committed 0447884

DMPlex: Replaced algorithm for label distribution with one that does not suck
- Need more tests

Comments (0)

Files changed (1)

src/dm/impls/plex/plex.c

       char           *name;
       PetscInt       *stratumSizes = NULL, *points = NULL;
       PetscMPIInt    *sendcnts     = NULL, *offsets = NULL, *displs = NULL;
-      PetscInt        nameSize, s, p;
+      PetscInt        nameSize, s, p, proc;
       PetscBool       isdepth;
       size_t          len = 0;
 
       /* Bcast stratumValues (could filter for no points in stratum) */
       if (!rank) {ierr = PetscMemcpy(newLabel->stratumValues, next->stratumValues, next->numStrata * sizeof(PetscInt));CHKERRQ(ierr);}
       ierr = MPI_Bcast(newLabel->stratumValues, newLabel->numStrata, MPIU_INT, 0, comm);CHKERRQ(ierr);
-      /* Find size on each process and Scatter */
+      /* Find size on each process and Scatter
+           we use the fact that both the stratum points and partArray are sorted */
       if (!rank) {
         ierr = ISGetIndices(part, &partArray);CHKERRQ(ierr);
         ierr = PetscMalloc(numProcs*next->numStrata * sizeof(PetscInt), &stratumSizes);CHKERRQ(ierr);
         ierr = PetscMemzero(stratumSizes, numProcs*next->numStrata * sizeof(PetscInt));CHKERRQ(ierr);
-        for (s = 0; s < next->numStrata; ++s) {
-          for (p = next->stratumOffsets[s]; p < next->stratumOffsets[s]+next->stratumSizes[s]; ++p) {
-            const PetscInt point = next->points[p];
-            PetscInt       proc;
-
-            for (proc = 0; proc < numProcs; ++proc) {
-              PetscInt dof, off, pPart;
-
-              ierr = PetscSectionGetDof(partSection, proc, &dof);CHKERRQ(ierr);
-              ierr = PetscSectionGetOffset(partSection, proc, &off);CHKERRQ(ierr);
-              for (pPart = off; pPart < off+dof; ++pPart) {
-                if (partArray[pPart] == point) {
-                  ++stratumSizes[proc*next->numStrata+s];
-                  break;
-                }
+        /* TODO We should switch to using binary search if the label is a lot smaller than partitions */
+        for (proc = 0; proc < numProcs; ++proc) {
+          PetscInt dof, off;
+
+          ierr = PetscSectionGetDof(partSection, proc, &dof);CHKERRQ(ierr);
+          ierr = PetscSectionGetOffset(partSection, proc, &off);CHKERRQ(ierr);
+          for (s = 0; s < next->numStrata; ++s) {
+            PetscInt lStart = next->stratumOffsets[s], lEnd = next->stratumOffsets[s]+next->stratumSizes[s];
+            PetscInt pStart = off,                     pEnd = off+dof;
+
+            while (pStart < pEnd && lStart < lEnd) {
+              if (partArray[pStart] > next->points[lStart]) {
+                ++lStart;
+              } else if (next->points[lStart] > partArray[pStart]) {
+                ++pStart;
+              } else {
+                ++stratumSizes[proc*next->numStrata+s];
+                ++pStart; ++lStart;
               }
             }
           }
           displs[p+1] = displs[p] + sendcnts[p];
         }
         ierr = PetscMalloc(displs[numProcs] * sizeof(PetscInt), &points);CHKERRQ(ierr);
-        for (s = 0; s < next->numStrata; ++s) {
-          for (p = next->stratumOffsets[s]; p < next->stratumOffsets[s]+next->stratumSizes[s]; ++p) {
-            const PetscInt point = next->points[p];
-            PetscInt       proc;
-
-            for (proc = 0; proc < numProcs; ++proc) {
-              PetscInt dof, off, pPart;
-
-              ierr = PetscSectionGetDof(partSection, proc, &dof);CHKERRQ(ierr);
-              ierr = PetscSectionGetOffset(partSection, proc, &off);CHKERRQ(ierr);
-              for (pPart = off; pPart < off+dof; ++pPart) {
-                if (partArray[pPart] == point) {
-                  points[offsets[proc]++] = point;
-                  break;
-                }
+        /* TODO We should switch to using binary search if the label is a lot smaller than partitions */
+        for (proc = 0; proc < numProcs; ++proc) {
+          PetscInt dof, off;
+
+          ierr = PetscSectionGetDof(partSection, proc, &dof);CHKERRQ(ierr);
+          ierr = PetscSectionGetOffset(partSection, proc, &off);CHKERRQ(ierr);
+          for (s = 0; s < next->numStrata; ++s) {
+            PetscInt lStart = next->stratumOffsets[s], lEnd = next->stratumOffsets[s]+next->stratumSizes[s];
+            PetscInt pStart = off,                     pEnd = off+dof;
+
+            while (pStart < pEnd && lStart < lEnd) {
+              if (partArray[pStart] > next->points[lStart]) {
+                ++lStart;
+              } else if (next->points[lStart] > partArray[pStart]) {
+                ++pStart;
+              } else {
+                points[offsets[proc]++] = next->points[lStart];
+                ++pStart; ++lStart;
               }
             }
           }