2014年1月2日星期四

java multithreading problem occurs repeatedly called data

Thread operation process description :
1, thread queries the database table (table1) data, and modify records traversing the state ( to prevent duplication of data calls ) . ( This added Genlock )
2, call interface , access to the returned status .
3, to insert data into the database (table2) and delete the corresponding data table1 .

Post code :
data manipulation class messageMgrFacadeImpl
public synchronized List findPushList(HashMap searchMap) {
// TODO Auto-generated method stub
this.status = transactionManager.getTransaction(definition);
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
//获取全部的未发送信息
List list = this.queryForList("findPushList",
searchMap);
try {
//修改信息状态
for(int n=0;n<list.size();n++){
HashMap listMap = (HashMap)list.get(n);
searchMap.put("smsId", listMap.get("SMS_ID"));
this.update("updatePushListById",searchMap);
}
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw new RuntimeException(e);
}
return list;
}
public synchronized void insertPushLog(HashMap searchMap) {
// TODO Auto-generated method stub
this.status = transactionManager.getTransaction(definition);
definition
.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
try {
this.insert("insertPushLog", searchMap);//添加数据操作记录
this.delete("deletePushList", searchMap);//删除原表记录
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw new RuntimeException(e);
}
}

process operation class

public class Pusher implements Runnable {
private Message message = new Message();
private HashMap<String, Object> searchMap = new HashMap<String, Object>();
private volatile boolean stop = false;
private MessageMgrFacadeImpl messageMgrFacadeImpl = null;

@Override
public void run() {
// 注入DAO
ApplicationContext context = new ClassPathXmlApplicationContext(
"applicationContext.xml");
messageMgrFacadeImpl = (MessageMgrFacadeImpl) context
.getBean("messageMgrFacade");
searchMap.put("pushTime", DateUtil.getCurrentTimeFull());
searchMap.put("maxCount", Config.getInstance().getMaxCount());
// 获取未发送信息记录
List list = messageMgrFacadeImpl.findPushList(searchMap);
if (list.size() > 0) {
for (int i = 0; i < list.size(); i++) {
HashMap listMap = (HashMap) list.get(i);
System.out.println("++++==" + i + ":" + listMap);
//.....接口操作                         HashMap<String, Object> search = new HashMap<String, Object>();
search.put("smsId", smsId);
search.put("errorCode", returnResult);
search.put("errorMsg", ReturnMessage.getInstance()
.getMsgByCode(returnResult));
// 保存返回信息
messageMgrFacadeImpl.insertPushLog(search);
}
} else {
// 休息1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Log4jInitialize.logger(ManagerThread.class).error(
"Pusher.run方法异常:" + e);
e.printStackTrace();
}
}
}
}

question :
There phenomenon partial data duplication in the implementation process, because the two tables with the same primary key identity , resulting in the insert duplicate primary key error ( especially when new data is entered in a database table data ) .
------ Solution ---------------------------------------- ----
List list = messageMgrFacadeImpl.findPushList (searchMap);
here to add synchronization to try
------ Solution ------------------------------ --------------
database lock , use the data in a thread , other threads can not operate on these data , can wait or out .
but not for insert data, insert data in the primary key to repeat mistakes , we add synchronization method to generate the primary key , we can solve
------ Solution ----------- --------------------------------- under
this problem plus points or debug debug log to find the reason , I estimate that the transaction in question use .
------ Solution ---------------------------------------- ----
debug look at the specific reasons
------ Solution --------------------------- -----------------
not a synchronization problem , but your problem primary key generation methods, mainly because Table1 Table2 using the primary key , Table1 delete data after regenerate the primary key may be deleted before will follow the same primary key data , but there are those Table2 primary key data, insert data in the primary key will appear when repeated . You can take the following manner : .
1 Table1 data if deleted, can put in and Table1 Table2 delete data as the primary key is also deleted, if you can delete just fine ;
2. Table1 data if deleted, but also to retain Table2 Table1 primary key and delete data in the same data , then ensure Table1 primary key is unique and there has never been before , you can use the database Id Table1 growth generated from the primary key , or they define a method to generate unique primary key , usually timestamp + random number.
------ Solution ---------------------------------------- ----
good landlord program complexity .

In the run () function to write synchronization primitives does not make sense , because this is just an instance is valid. Obviously you every time :
Pusher p = new Pusher ();
Thread t = new Thread (p);
t.start ();
are new instances of non- static function synchronization primitives can not control across instances .


feel the problem still lies in the database level , Select time there for update?
------ For reference only -------------------------------------- -
not see where you are calling Pusher multithreading to perform operations ah
------ For reference only --------------------- ------------------

I used to perform a one minute timer timer
This is the thread management class
public class ManagerThread extends Thread {
private volatile boolean stop = false;
private final LinkedList<Thread> sendThreadList = new LinkedList<Thread>();
private static final Object sendLock = new Object();
private int sendThreadCount = 1;

public ManagerThread() {
this.setDaemon(true);
}

public ManagerThread(int sendThreadCount) {
this.sendThreadCount = sendThreadCount;
this.setDaemon(true);
}

@Override
public void run() {
while (!stop) {
try {
synchronized (sendLock) {
for (int i = 0; i < sendThreadList.size(); i++) {
Thread t = sendThreadList.get(i);
if (t.getState() == Thread.State.TERMINATED) {
sendThreadList.remove(i);
break;
}
}
if (sendThreadList.size() < this.sendThreadCount) {
Pusher p = new Pusher();
Thread t = new Thread(p);
t.start();
sendThreadList.add(t);
}
}
// 睡眠30s
TimeUnit.SECONDS.sleep(30);

} catch (Exception e) {
e.printStackTrace();
}
}
}
public void myStop() {
synchronized (sendLock) {
for (int i = 0; i < this.sendThreadList.size(); i++) {
Thread t = sendThreadList.get(i);
if (t != null) {
t.stop();
}
}
this.sendThreadList.clear();
}
this.stop = true;

}
}

sendThreadCount number of threads is set
public class PushMain {
private ManagerThread tm;
private boolean stop = false;

public void start()
{
tm = new ManagerThread(Config.getInstance().getSendThreadCount());
tm.start();
}

public void stop()
{
if (tm != null){
tm.myStop();
}
stop = true;
}
}

task
PushMain pushMain = new PushMain ();
pushMain.start ();
------ For reference only ---------------------------- -----------
public synchronized List findPushList(HashMap searchMap) 
here plus no good?
------ For reference only -------------------------------------- -

@Override
public void run() {
// 注入DAO
ApplicationContext context = new ClassPathXmlApplicationContext(
"applicationContext.xml");
messageMgrFacadeImpl = (MessageMgrFacadeImpl) context
.getBean("doone-education-messageMgrFacade");
searchMap.put("pushTime", DateUtil.getCurrentTimeFull());
searchMap.put("maxCount", Config.getInstance().getMaxCount());
// 获取未发送信息记录
synchronized (this) {
List list = messageMgrFacadeImpl.findPushList(searchMap);
if (list.size() > 0) {
for (int i = 0; i < list.size(); i++) {
HashMap listMap = (HashMap) list.get(i);
System.out.println("++++==" + i + ":" + listMap);
//......接口操作
HashMap<String, Object> search = new HashMap<String, Object>();
search.put("smsId", smsId);
search.put("errorCode", returnResult);
search.put("errorMsg", ReturnMessage.getInstance()
.getMsgByCode(returnResult));
// 保存返回信息
messageMgrFacadeImpl.insertPushLog(search);
}
} else {
// 休息1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Log4jInitialize.logger(ManagerThread.class).error(
"Pusher.run方法异常:" + e);
e.printStackTrace();
}
}
}
}

I put the operation into a full lock, but the same problem still there !
------ For reference only -------------------------------------- - after completion
insert the corresponding record you want to modify that state , these two also need to be synchronized .
------ For reference only -------------------------------------- -

but that these are mistakes
------ For reference only ------------------- --------------------

Table1 primary key is to use oracle sequence is not repeated, I added output for loop, found the data is exactly the same , that is repeated to obtain the data , not the data itself duplication
------ For reference only ------------------- --------------------

Table1 primary key is to use oracle sequence is not repeated, I added output for loop, found the data is exactly the same , that is repeated to obtain the data , not the data itself duplication   If you feel bad after Sql write , in check out List;
Well, that depends on your List findPushList (HashMap searchMap) is how to write, change it Sql statement should be able to solve the directly into the List Set.
------ For reference only -------------------------------------- -

Table1 primary key is to use oracle sequence is not repeated, I added output for loop, discovery data is exactly the same , that is repeated to obtain data , not the data itself duplication          If you feel bad after Sql write , in check out List;
Well, that depends on your List findPushList (HashMap searchMap) is how to write, change it Sql statement should be able to solve the directly into the List Set.  



just removed the synchronization lock , add the FOR UPDATE NOWAIT SKIP LOCKED, tested locally , the problem is not there, did not want to put up a server problem , ha ha ! I posted the sql statement (ibatis)

SELECT T.SMS_ID,
    T.USER_ID,
        T.SMS_TYPE,
        T.SCH_ID,
        T.SMS_TITLE,
        T.SMS_CONTENT,
        TO_CHAR(T.SMS_CREATE_DATE,'YYYY-MM-DD HH24:MI:SS') CREATE_DATE,
        TO_CHAR(T.SMS_PUSH_DATE,'YYYY-MM-DD HH24:MI:SS') PUSH_DATE,
        T.SCH_LEVEL FROM SMS_PUSH T 
 WHERE T.SMS_ID IN(
     SELECT * FROM(SELECT A.SMS_ID
  FROM SMS_PUSH A
 WHERE 1=1
   AND A.SMS_PUSH_STATE IS NULL
<isNotEmpty property="userType">
   AND A.SMS_TYPE = #userType#
</isNotEmpty>
<isNotEmpty property="pushTime">
   AND TO_CHAR(A.SMS_PUSH_DATE,'YYYY-MM-DD HH24:MI:SS') &lt; #pushTime#
</isNotEmpty>
   ORDER BY A.SMS_ID)WHERE ROWNUM &lt;= $maxCount$
   ) FOR UPDATE NOWAIT SKIP LOCKED

------ For reference only ----------------------------------- ----

Unfortunately, just hang and error , or repeat the question
Great God can give a solution?
requirement is to constantly get table1 data after the operation , the operation of the write table2
------ For reference only ------------------ ---------------------

Table1 primary key is to use oracle sequence is not repeated, I added output for loop, discovery data is exactly the same , that is repeated to obtain the data , not the data itself duplication                If you feel bad after Sql write , in check out List;
Well, that depends on your List findPushList (HashMap searchMap) is how to write, change it Sql statement should be able to solve the directly into the List Set.          
  
  
     
just removed the synchronization lock , add the FOR UPDATE NOWAIT SKIP LOCKED, tested locally , the problem is not there, did not want to put up a server problem , ha ha ! I posted the sql statement (ibatis)   
  

SELECT T.SMS_ID,
    T.USER_ID,
        T.SMS_TYPE,
        T.SCH_ID,
        T.SMS_TITLE,
        T.SMS_CONTENT,
        TO_CHAR(T.SMS_CREATE_DATE,'YYYY-MM-DD HH24:MI:SS') CREATE_DATE,
        TO_CHAR(T.SMS_PUSH_DATE,'YYYY-MM-DD HH24:MI:SS') PUSH_DATE,
        T.SCH_LEVEL FROM SMS_PUSH T 
 WHERE T.SMS_ID IN(
     SELECT * FROM(SELECT A.SMS_ID
  FROM SMS_PUSH A
 WHERE 1=1
   AND A.SMS_PUSH_STATE IS NULL
<isNotEmpty property="userType">
   AND A.SMS_TYPE = #userType#
</isNotEmpty>
<isNotEmpty property="pushTime">
   AND TO_CHAR(A.SMS_PUSH_DATE,'YYYY-MM-DD HH24:MI:SS') &lt; #pushTime#
</isNotEmpty>
   ORDER BY A.SMS_ID)WHERE ROWNUM &lt;= $maxCount$
   ) FOR UPDATE NOWAIT SKIP LOCKED
 
Do not lock the database layer , as this may cause a lot of unnecessary trouble. You can try the following recommendations to :
1. synchronization code has a problem , because your messageMgrFacadeImpl is not a single case , the code synchronization moot , the messageMgrFacadeImpl written singleton pattern , or messageMgrFacadeImpl method are set as static .
2.SQL I looked at the statement should not duplicate data may be data in the database itself is a problem , do what selet t1.SMS_ID from table1 t1, table2 t2 where t1.SMS_ID = t2.SMS_ID;
If you have a query result data returned , indicating that the data in the database itself is a problem , according to their own situation to clean up duplicate data . Also think about why there is data problems , before operational errors , or SMS_ID generation approach is indeed a problem .
------ For reference only -------------------------------------- -


you use Hibernate it?

two strategies :
1, transaction control pushed down the database level .
Select statement with "For Update" keyword explicit locking , namely : Select * From Table 1 Where conditions For Update ; [ transaction begins ] Only Select one or more lines of data , then immediately Update their flag [ transaction commits ] ; [ affairs start ] and then one by one the next processing ( insert Table 2 ) , after the success or failure of the update table flag 1 [ transaction commit ] . Transaction process must be short enough to avoid affecting performance, but it can not be too short in order to avoid losing control.
your application problems, the code is too messy difficult to see clearly , but push down the database transaction level , it should not be a problem , doubt your flag control problems .

2, a single-threaded task allocation , and then execute multiple threads.
prepare a thread pool : Pool;
then prepare a worker : Worker, handle the specified data and writes the results in Table 2 ;
a separate task scheduler : Dispatcher, responsible for identifying the need to update the rows of data from Table 1, and then instantiate a Worker and the data passed to it , and then thrown into the thread pool. If the thread pool tasks to be processed more dispatcher rest again .

17 F.
said second point of concern, whether there is duplication of raw data , field conditions whether you choose is not a primary key.
------ For reference only -------------------------------------- -

  
you use Hibernate it?   
  
two strategies :   
1, transaction control pushed down the database level .   
Select statement with "For Update" keyword explicit locking , namely : Select * From Table 1 Where conditions    For Update ; [ transaction begins ] Only Select one or more lines of data , then immediately Update their flag [ transaction commits ] ; [ affairs start ] and then one by one the next processing ( insert Table 2 ) , after the success or failure of the update table flag 1 [ transaction commit ] . Transaction process must be short enough to avoid affecting performance, but it can not be too short in order to avoid losing control.   
your application problems, the code is too messy difficult to see clearly , but push down the database transaction level , it should not be a problem , doubt your flag control problems .   
  
2, a single-threaded task allocation , and then execute multiple threads.   
prepare a thread pool : Pool;   
then prepare a worker : Worker, handle the specified data and writes the results in Table 2 ;   
a separate task scheduler : Dispatcher, responsible for identifying the need to update the rows of data from Table 1, and then instantiate a Worker and the data passed to it , and then thrown into the thread pool. If the thread pool tasks to be processed more dispatcher rest again .   
  
  17 F.
said second point of concern, whether there is duplication of raw data , field conditions whether you choose is not a primary key.  
Thanks for the help of God and the idea of ​​a large , knot stickers , ha ha ! It was observed a few days , I think the reason I put the extracted data should be placed in the child thread , causing the child thread between the illegal data sharing , in accordance with the original purpose of the design ideas should be the direction : the main thread is used to obtain the data , the data assigned to the sub- threads in parallel. Have time I'll try your way , ha ha !

没有评论:

发表评论