1 module redlock.RedLock;
2 
3 import std.uuid;
4 import std.string;
5 import std.random;
6 import std.conv;
7 import std.stdio;
8 
9 import core.time;
10 import core.thread;
11 import core.stdc.stdlib;
12 
13 import redis;
14 
15 
16 
17 struct LockedObject
18 {
19 	string  key;
20 	string  uniqueId;
21 	ulong 	validTime;
22 }
23 
24 
25 class RedLock
26 {
27 
28 	this(const string redis_servers , bool initConnected = true){
29 
30 		auto hostports = redis_servers.split(";");
31 		foreach( hp ; hostports)
32 		{
33 			if(hp == string.init)
34 				continue;
35 
36 			string[] hpc = hp.split(":");
37 
38 			try{
39 				_redisClients ~= new Redis(hpc[0] , to!ushort(hpc[1]));
40 			}
41 			catch(Throwable e)
42 			{
43 				if(initConnected)
44 					 throw e;
45 				else
46 					_hostports[hp] = true;
47 			}
48 		
49 		
50 		}
51 
52 		_quornum = _redisClients.length / 2 + 1;
53 	
54 		_delaytime = 10;
55 		_clockFactor = 0.01;
56 	}
57 
58 
59 	bool Lock(const string key , ref LockedObject lock ,
60 		uint timeout = uint.max , uint ttl = 60000  )
61 	{
62 		string val = to!string(randomUUID());
63 		auto end_tick = nsecsToTicks(cast(long)timeout * 1000 * 1000) + MonoTime.currTime.ticks();
64 		synchronized(this)
65 		{
66 			lock.key = key;
67 			lock.uniqueId = val;
68 
69 			do{
70 				ulong n = 0;
71 				auto t1 = MonoTime.currTime.ticks();
72 				foreach(c ; _redisClients)
73 				{
74 					if(LockInstance(c , key, val , ttl)) ++n;
75 				}
76 
77 				auto t2 =  MonoTime.currTime.ticks();
78 				auto clockdrat = cast(ulong)(_clockFactor * ttl) + 2;
79 				ulong validtime = ttl - ticksToNSecs(t2 - t1)/1000 - clockdrat;
80 				if(validtime > 0 && n >= _quornum)
81 				{
82 					lock.validTime = validtime;
83 					return true;
84 				}else{
85 					Unlock(lock);
86 				}
87 				ulong delay = rand() % _delaytime + _delaytime / 2;
88 				Thread.sleep(dur!"msecs"(delay));
89 			}while(MonoTime.currTime.ticks() < end_tick);
90 
91 			return false;
92 		}
93 
94 	}
95 
96 	void Unlock(const ref LockedObject lock)
97 	{
98 		synchronized(this)
99 		{
100 			foreach(c ; _redisClients)
101 			{
102 				UnlockInstance(c , lock.key  , lock.uniqueId);
103 			}
104 
105 			if(_hostports !is null)
106 			{
107 				foreach(k  ; _hostports.keys)
108 				{
109 					auto hpc = k.split(":");
110 					try{
111 						_redisClients ~= new Redis(hpc[0] , to!ushort(hpc[1]));
112 						_hostports.remove(k);
113 					}
114 					catch(Throwable e)
115 					{
116 
117 					}
118 				}
119 			}
120 		}
121 
122 	}
123 
124 private:
125 
126 	bool LockInstance(Redis redis , const string key , const string value ,  uint ttl)
127 	{
128 		try{
129 			return redis.send!bool("set" , _prefix ~ key , value , "px" , ttl , "nx");
130 		}
131 		catch(Throwable e)
132 		{
133 			return false;
134 		}
135 	}
136 
137 	void UnlockInstance(Redis redis , const string key , const string value)
138 	{
139 		try{
140 			redis.eval(`if redis.call('get', KEYS[1]) == ARGV[1] 
141 							then return redis.call('del', KEYS[1])
142 						else 
143 							return 0 
144 						end`,[_prefix ~ key] , [value]);
145 		}
146 		catch(Throwable e)
147 		{
148 
149 		}
150 
151 	}
152 
153 	Redis[]						_redisClients;
154 
155 	bool[string]				_hostports;
156 
157 	immutable ulong				_quornum;
158 	immutable ulong 			_delaytime = 10;
159 	immutable float 			_clockFactor = 0.01;
160 
161 
162 	static immutable string		_prefix	= "Dlang_RedLock_";
163 }
164 
165 unittest{
166 
167 
168 	import core.thread;
169 	import std.stdio;
170 	import std.conv;
171 	class Test:Thread
172 	{
173 		string 	_name;  
174 		int 	_second;
175 		bool  	_flag;
176 		RedLock _lock;
177 		this(string name , int second)
178 		{
179 			super(&run);
180 			_name = name;
181 			_second = second;
182 			_lock = new RedLock("127.0.0.1:6379");
183 			_flag = true;
184 		}
185 
186 		void stop()
187 		{
188 			_flag = false;
189 		}
190 
191 		void run()
192 		{
193 			while(_flag)
194 			{
195 				LockedObject obj;
196 				if(!_lock.Lock("test1" ,obj , 1000))
197 				{
198 					writeln(" timeout failed ", _name);
199 					continue;
200 				}
201 
202 				writeln(_name , " locked");
203 				Thread.sleep(dur!"msecs"(500));
204 				writeln(_name , " un locked");
205 				_lock.Unlock(obj);
206 				Thread.sleep(dur!"seconds"(1));
207 			}
208 
209 		}
210 	}
211 
212 
213 	Test[] list;
214 	for(uint i = 0 ; i < 100 ; i ++)
215 	{
216 		auto test = new Test(to!string(i) , 1);
217 		test.start();
218 		list ~= test;
219 	}
220 
221 	Thread.sleep(dur!"seconds"(30));
222 
223 	foreach(t ; list)
224 	{
225 		t.stop();
226 	}
227 
228 	foreach( t; list)
229 		t.join();
230 
231 }