1 module redlock.RedLock; 2 3 import std.uuid; 4 import std.string; 5 import std.random; 6 import std.conv; 7 import std.stdio; 8 9 import core.time; 10 import core.thread; 11 import core.stdc.stdlib; 12 13 import redis; 14 15 16 17 struct LockedObject 18 { 19 string key; 20 string uniqueId; 21 ulong validTime; 22 } 23 24 25 class RedLock 26 { 27 28 this(const string redis_servers , bool initConnected = true){ 29 30 auto hostports = redis_servers.split(";"); 31 foreach( hp ; hostports) 32 { 33 if(hp == string.init) 34 continue; 35 36 string[] hpc = hp.split(":"); 37 38 try{ 39 _redisClients ~= new Redis(hpc[0] , to!ushort(hpc[1])); 40 } 41 catch(Throwable e) 42 { 43 if(initConnected) 44 throw e; 45 else 46 _hostports[hp] = true; 47 } 48 49 50 } 51 52 _quornum = _redisClients.length / 2 + 1; 53 54 _delaytime = 10; 55 _clockFactor = 0.01; 56 } 57 58 59 bool Lock(const string key , ref LockedObject lock , 60 uint timeout = uint.max , uint ttl = 60000 ) 61 { 62 string val = to!string(randomUUID()); 63 auto end_tick = nsecsToTicks(cast(long)timeout * 1000 * 1000) + MonoTime.currTime.ticks(); 64 synchronized(this) 65 { 66 lock.key = key; 67 lock.uniqueId = val; 68 69 do{ 70 ulong n = 0; 71 auto t1 = MonoTime.currTime.ticks(); 72 foreach(c ; _redisClients) 73 { 74 if(LockInstance(c , key, val , ttl)) ++n; 75 } 76 77 auto t2 = MonoTime.currTime.ticks(); 78 auto clockdrat = cast(ulong)(_clockFactor * ttl) + 2; 79 ulong validtime = ttl - ticksToNSecs(t2 - t1)/1000 - clockdrat; 80 if(validtime > 0 && n >= _quornum) 81 { 82 lock.validTime = validtime; 83 return true; 84 }else{ 85 Unlock(lock); 86 } 87 ulong delay = rand() % _delaytime + _delaytime / 2; 88 Thread.sleep(dur!"msecs"(delay)); 89 }while(MonoTime.currTime.ticks() < end_tick); 90 91 return false; 92 } 93 94 } 95 96 void Unlock(const ref LockedObject lock) 97 { 98 synchronized(this) 99 { 100 foreach(c ; _redisClients) 101 { 102 UnlockInstance(c , lock.key , lock.uniqueId); 103 } 104 105 if(_hostports !is null) 106 { 107 foreach(k ; _hostports.keys) 108 { 109 auto hpc = k.split(":"); 110 try{ 111 _redisClients ~= new Redis(hpc[0] , to!ushort(hpc[1])); 112 _hostports.remove(k); 113 } 114 catch(Throwable e) 115 { 116 117 } 118 } 119 } 120 } 121 122 } 123 124 private: 125 126 bool LockInstance(Redis redis , const string key , const string value , uint ttl) 127 { 128 try{ 129 return redis.send!bool("set" , _prefix ~ key , value , "px" , ttl , "nx"); 130 } 131 catch(Throwable e) 132 { 133 return false; 134 } 135 } 136 137 void UnlockInstance(Redis redis , const string key , const string value) 138 { 139 try{ 140 redis.eval(`if redis.call('get', KEYS[1]) == ARGV[1] 141 then return redis.call('del', KEYS[1]) 142 else 143 return 0 144 end`,[_prefix ~ key] , [value]); 145 } 146 catch(Throwable e) 147 { 148 149 } 150 151 } 152 153 Redis[] _redisClients; 154 155 bool[string] _hostports; 156 157 immutable ulong _quornum; 158 immutable ulong _delaytime = 10; 159 immutable float _clockFactor = 0.01; 160 161 162 static immutable string _prefix = "Dlang_RedLock_"; 163 } 164 165 unittest{ 166 167 168 import core.thread; 169 import std.stdio; 170 import std.conv; 171 class Test:Thread 172 { 173 string _name; 174 int _second; 175 bool _flag; 176 RedLock _lock; 177 this(string name , int second) 178 { 179 super(&run); 180 _name = name; 181 _second = second; 182 _lock = new RedLock("127.0.0.1:6379"); 183 _flag = true; 184 } 185 186 void stop() 187 { 188 _flag = false; 189 } 190 191 void run() 192 { 193 while(_flag) 194 { 195 LockedObject obj; 196 if(!_lock.Lock("test1" ,obj , 1000)) 197 { 198 writeln(" timeout failed ", _name); 199 continue; 200 } 201 202 writeln(_name , " locked"); 203 Thread.sleep(dur!"msecs"(500)); 204 writeln(_name , " un locked"); 205 _lock.Unlock(obj); 206 Thread.sleep(dur!"seconds"(1)); 207 } 208 209 } 210 } 211 212 213 Test[] list; 214 for(uint i = 0 ; i < 100 ; i ++) 215 { 216 auto test = new Test(to!string(i) , 1); 217 test.start(); 218 list ~= test; 219 } 220 221 Thread.sleep(dur!"seconds"(30)); 222 223 foreach(t ; list) 224 { 225 t.stop(); 226 } 227 228 foreach( t; list) 229 t.join(); 230 231 }