upas/fs: handle plumbing for new messages for concurrent index updates
when multiple upas/fs instances are running on the same index, another upas/fs could have written the index, but we still want to plumb the message. so we introduce another cstate flag "Cnew" that is set when a message that we havnt seen before by rdidx().
This commit is contained in:
parent
99ed9623b5
commit
81274ea0cf
5 changed files with 32 additions and 23 deletions
|
@ -375,7 +375,8 @@ static char *itab[] = {
|
|||
"idx",
|
||||
"stale",
|
||||
"header",
|
||||
"body"
|
||||
"body",
|
||||
"new",
|
||||
};
|
||||
|
||||
char*
|
||||
|
|
|
@ -6,6 +6,7 @@ enum {
|
|||
Cidxstale = 1<<1,
|
||||
Cheader = 1<<2,
|
||||
Cbody = 1<<3,
|
||||
Cnew = 1<<4,
|
||||
|
||||
/* encodings */
|
||||
Enone= 0,
|
||||
|
@ -65,8 +66,8 @@ struct Idx {
|
|||
char *idxaux; /* mailbox specific */
|
||||
|
||||
char *type; /* mime info */
|
||||
char disposition;
|
||||
char *filename;
|
||||
char disposition;
|
||||
|
||||
int nparts;
|
||||
};
|
||||
|
@ -117,9 +118,9 @@ struct Message {
|
|||
|
||||
/* mime info */
|
||||
char *charset;
|
||||
char encoding;
|
||||
char *boundary;
|
||||
char converted;
|
||||
char encoding;
|
||||
char decoded;
|
||||
char mimeflag;
|
||||
|
||||
|
@ -208,7 +209,7 @@ void putcache(Mailbox*, Message*); /* asymmetricial */
|
|||
long cachefree(Mailbox*, Message*, int);
|
||||
|
||||
Message* gettopmsg(Mailbox*, Message*);
|
||||
char* syncmbox(Mailbox*);
|
||||
char* syncmbox(Mailbox*, int);
|
||||
void* emalloc(ulong);
|
||||
void* erealloc(void*, ulong);
|
||||
Message* newmessage(Message*);
|
||||
|
|
|
@ -1008,7 +1008,7 @@ readmboxdir(Fid *f, uchar *buf, long off, int cnt, int blen)
|
|||
Message *msg;
|
||||
|
||||
if(off == 0)
|
||||
syncmbox(f->mb);
|
||||
syncmbox(f->mb, 1);
|
||||
|
||||
n = 0;
|
||||
if(f->mb->ctl){
|
||||
|
@ -1343,7 +1343,7 @@ rstat(Fid *f)
|
|||
Dir d;
|
||||
|
||||
if(FILE(f->qid.path) == Qmbox)
|
||||
syncmbox(f->mb);
|
||||
syncmbox(f->mb, 1);
|
||||
mkstat(&d, f->mb, f->m, FILE(f->qid.path));
|
||||
rhdr.nstat = convD2M(&d, mbuf, messagesize - IOHDRSZ);
|
||||
rhdr.stat = mbuf;
|
||||
|
@ -1465,7 +1465,7 @@ reader(void)
|
|||
}
|
||||
}
|
||||
if(mb != nil) {
|
||||
syncmbox(mb);
|
||||
syncmbox(mb, 1);
|
||||
qunlock(&synclock);
|
||||
} else {
|
||||
qunlock(&synclock);
|
||||
|
|
|
@ -383,7 +383,6 @@ dead:
|
|||
m->cstate |= Cidx;
|
||||
idprint("→%.2ux\n", m->cstate);
|
||||
free(s);
|
||||
// s = 0;
|
||||
continue;
|
||||
}
|
||||
m = newmessage(parent);
|
||||
|
@ -412,7 +411,7 @@ dead:
|
|||
m->nparts = strtoul(f[21], 0, 0);
|
||||
|
||||
m->cstate &= ~Cidxstale;
|
||||
m->cstate |= Cidx;
|
||||
m->cstate |= Cidx|Cnew;
|
||||
m->str = s;
|
||||
s = 0;
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ static void mailplumb(Mailbox*, Message*);
|
|||
* do we want to plumb flag changes?
|
||||
*/
|
||||
char*
|
||||
syncmbox(Mailbox *mb)
|
||||
syncmbox(Mailbox *mb, int doplumb)
|
||||
{
|
||||
char *s;
|
||||
int n, d, y, a;
|
||||
|
@ -83,13 +83,20 @@ syncmbox(Mailbox *mb)
|
|||
y = 0;
|
||||
for(m = mb->root->part; m; m = next){
|
||||
next = m->next;
|
||||
if((m->cstate & Cidx) == 0 && m->deleted == 0){
|
||||
cachehash(mb, m);
|
||||
if(insurecache(mb, m) == 0){
|
||||
mailplumb(mb, m);
|
||||
msgdecref(mb, m);
|
||||
if(m->deleted == 0){
|
||||
if((m->cstate & Cidx) == 0){
|
||||
cachehash(mb, m);
|
||||
m->cstate |= Cnew;
|
||||
n++;
|
||||
} else if(!doplumb)
|
||||
m->cstate &= ~Cnew;
|
||||
if(m->cstate & Cnew){
|
||||
if(insurecache(mb, m) == 0){
|
||||
mailplumb(mb, m);
|
||||
msgdecref(mb, m);
|
||||
}
|
||||
m->cstate &= ~Cnew;
|
||||
}
|
||||
n++;
|
||||
}
|
||||
if(m->cstate & Cidxstale)
|
||||
y++;
|
||||
|
@ -98,7 +105,8 @@ syncmbox(Mailbox *mb)
|
|||
if(mb->delete && m->inmbox && m->deleted & Deleted)
|
||||
mb->delete(mb, m);
|
||||
if(!m->inmbox){
|
||||
mailplumb(mb, m);
|
||||
if(doplumb)
|
||||
mailplumb(mb, m);
|
||||
delmessage(mb, m);
|
||||
d++;
|
||||
}
|
||||
|
@ -253,7 +261,7 @@ newmbox(char *path, char *name, int flags, Mailbox **r)
|
|||
if(r)
|
||||
*r = mb;
|
||||
|
||||
return syncmbox(mb);
|
||||
return syncmbox(mb, 0);
|
||||
}
|
||||
|
||||
/* close the named mailbox */
|
||||
|
@ -279,7 +287,7 @@ syncallmboxes(void)
|
|||
Mailbox *m;
|
||||
|
||||
for(m = mbl; m != nil; m = m->next)
|
||||
if(err = syncmbox(m))
|
||||
if(err = syncmbox(m, 0))
|
||||
eprint("syncmbox: %s\n", err);
|
||||
}
|
||||
|
||||
|
@ -1070,7 +1078,7 @@ delmessages(int ac, char **av)
|
|||
break;
|
||||
}
|
||||
if(needwrite)
|
||||
syncmbox(mb);
|
||||
syncmbox(mb, 1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1100,7 +1108,7 @@ flagmessages(int argc, char **argv)
|
|||
needwrite = 1;
|
||||
}
|
||||
if(needwrite)
|
||||
syncmbox(mb);
|
||||
syncmbox(mb, 1);
|
||||
return rerr;
|
||||
}
|
||||
|
||||
|
@ -1117,7 +1125,7 @@ msgdecref(Mailbox *mb, Message *m)
|
|||
m->refs--;
|
||||
if(m->refs == 0){
|
||||
if(m->deleted)
|
||||
syncmbox(mb);
|
||||
syncmbox(mb, 1);
|
||||
else
|
||||
putcache(mb, m);
|
||||
}
|
||||
|
@ -1149,7 +1157,7 @@ mboxdecref(Mailbox *mb)
|
|||
assert(mb->refs > 0);
|
||||
mb->refs--;
|
||||
if(mb->refs == 0){
|
||||
syncmbox(mb);
|
||||
syncmbox(mb, 1);
|
||||
delmessage(mb, mb->root);
|
||||
if(mb->ctl)
|
||||
hfree(PATH(mb->id, Qmbox), "ctl");
|
||||
|
|
Loading…
Reference in a new issue